diff --git a/README.md b/README.md index f7db6d4..912ac43 100644 --- a/README.md +++ b/README.md @@ -68,6 +68,18 @@ skill-issue init That's it. +### Starting from your history + +Already have Claude Code sessions? Don't start from zero: + +```bash +skill-issue analyze --all +``` + +The analyzer scans your conversation history and infers initial mastery scores. Questions you asked → weakness signal. Code you wrote → strength signal. Your knowledge graph starts calibrated to reality, not a blank slate. + +During `skill-issue init`, you'll be prompted to run this automatically. + --- ## Challenge Types @@ -90,6 +102,7 @@ Grounded in what was just built. No random trivia. | Command | What it does | |---|---| | `skill-issue init` | Onboarding + profile setup | +| `skill-issue analyze` | Bootstrap mastery from Claude Code history | | `skill-issue stats` | XP, level, streak, topic breakdown | | `skill-issue graph show --domain ` | ASCII viz | | `skill-issue graph weak --domain ` | Top priority nodes | diff --git a/skill_issue/analyzer.py b/skill_issue/analyzer.py new file mode 100644 index 0000000..4b1b990 --- /dev/null +++ b/skill_issue/analyzer.py @@ -0,0 +1,486 @@ +#!/usr/bin/env python3 +""" +Retroactive knowledge bootstrap analyzer. + +Scans Claude Code session history (JSONL files) and infers initial mastery scores +based on conversation patterns — questions indicate weakness, assertions indicate strength. +""" + +import json +import re +from datetime import datetime, timezone +from pathlib import Path +from typing import Optional + +from skill_issue.knowledge_state import ( + load_graph, + load_state, + save_state, + init_domain, + list_domains, + SKILL_DIR, +) + + +# Claude Code session storage location +CLAUDE_PROJECTS_DIR = Path.home() / ".claude" / "projects" + +# Question markers indicating uncertainty/weakness +QUESTION_MARKERS = [ + r"\bwhat is\b", + r"\bhow does\b", + r"\bcan you explain\b", + r"\bwhy does\b", + r"\bi don'?t understand\b", + r"\bhelp me understand\b", + r"\bwhat'?s the difference\b", + r"\bi'?m confused about\b", + r"\bwhat does .+ mean\b", + r"\bcan you help me\b", + r"\bi'?m not sure\b", + r"\bwhat'?s .+ for\b", + r"\bhow do (i|you|we)\b", + r"\bwhy is\b", + r"\bwhy do\b", + r"\?$", # Ends with question mark +] + +# Confidence markers indicating strength +CONFIDENCE_MARKERS = [ + r"^```", # Code blocks (user sharing code they wrote) + r"\bi implemented\b", + r"\bi wrote\b", + r"\bi built\b", + r"\bi created\b", + r"\bi fixed\b", + r"\bhere'?s my\b", + r"\blet me show you\b", + r"\bthis is how\b", +] + + +def find_project_sessions(project_path: Optional[Path] = None) -> list[Path]: + """ + Find all JSONL session files for a project. + + If project_path is None, searches current working directory's project folder. + """ + if not CLAUDE_PROJECTS_DIR.exists(): + return [] + + if project_path is None: + project_path = Path.cwd() + + # Claude Code uses a mangled path format: /Users/foo/bar -> -Users-foo-bar + mangled = str(project_path).replace("/", "-") + if mangled.startswith("-"): + mangled = mangled # Already starts with dash, that's correct + else: + mangled = "-" + mangled + + project_dir = CLAUDE_PROJECTS_DIR / mangled + if not project_dir.exists(): + return [] + + return sorted(project_dir.glob("*.jsonl"), key=lambda p: p.stat().st_mtime, reverse=True) + + +def find_all_sessions() -> list[Path]: + """Find all JSONL session files across all projects.""" + if not CLAUDE_PROJECTS_DIR.exists(): + return [] + + sessions = [] + for project_dir in CLAUDE_PROJECTS_DIR.iterdir(): + if project_dir.is_dir(): + sessions.extend(project_dir.glob("*.jsonl")) + + return sorted(sessions, key=lambda p: p.stat().st_mtime, reverse=True) + + +def extract_messages(session_path: Path) -> list[dict]: + """ + Extract user and assistant messages from a session JSONL file. + + Returns list of dicts with keys: role, text, timestamp + """ + messages = [] + + try: + with open(session_path, "r") as f: + for line in f: + try: + obj = json.loads(line) + msg_type = obj.get("type") + + if msg_type not in ("user", "assistant"): + continue + + message = obj.get("message", {}) + content = message.get("content", "") + timestamp = obj.get("timestamp", "") + + # Handle content that can be string or list + text = "" + if isinstance(content, str): + text = content + elif isinstance(content, list): + # Extract text from content blocks + for block in content: + if isinstance(block, dict) and block.get("type") == "text": + text += block.get("text", "") + "\n" + + if text.strip(): + messages.append({ + "role": msg_type, + "text": text.strip(), + "timestamp": timestamp, + }) + + except json.JSONDecodeError: + continue + except Exception: + return [] + + return messages + + +def detect_concepts_in_text(text: str, domain: str) -> list[str]: + """ + Detect which concepts from a domain's knowledge graph appear in text. + + Returns list of node IDs that were detected. + """ + try: + graph = load_graph(domain) + except FileNotFoundError: + return [] + + text_lower = text.lower() + detected = [] + + for node in graph["nodes"]: + node_id = node["id"] + matches = False + + # Check node name + if node["name"].lower() in text_lower: + matches = True + + # Check node id (kebab-case variants) + id_variants = [ + node_id.lower(), + node_id.replace("-", " "), + node_id.replace("-", "_"), + ] + for variant in id_variants: + if variant in text_lower: + matches = True + break + + # Check aliases + for alias in node.get("aliases", []): + alias_lower = alias.lower() + if len(alias) <= 3: + # Short aliases need word boundaries + pattern = r'\b' + re.escape(alias_lower) + r'\b' + if re.search(pattern, text_lower): + matches = True + break + elif alias_lower in text_lower: + matches = True + break + + if matches: + detected.append(node_id) + + return detected + + +def classify_message_intent(text: str) -> str: + """ + Classify whether a user message indicates a question/weakness or assertion/strength. + + Returns: "question", "assertion", or "neutral" + """ + text_lower = text.lower() + + # Check question markers + question_count = 0 + for pattern in QUESTION_MARKERS: + if re.search(pattern, text_lower): + question_count += 1 + + # Check confidence markers + confidence_count = 0 + for pattern in CONFIDENCE_MARKERS: + if re.search(pattern, text_lower, re.MULTILINE): + confidence_count += 1 + + # Classify based on marker counts + if question_count > confidence_count: + return "question" + elif confidence_count > 0: + return "assertion" + else: + return "neutral" + + +def analyze_sessions( + sessions: list[Path], + domains: list[str], + max_sessions: int = 50, +) -> dict[str, dict[str, dict]]: + """ + Analyze session history and compute concept scores. + + Returns: {domain: {node_id: {"score": float, "signals": list}}} + + Scoring rules per concept: + - User question mentioning concept: -0.15 (weakness signal) + - User assertion/code mentioning concept: +0.20 (strength signal) + - Claude-only mention (user didn't mention): +0.05 (neutral/slight positive) + - Final score clamped to [0.0, 0.8] + """ + results = {domain: {} for domain in domains} + + for session_path in sessions[:max_sessions]: + messages = extract_messages(session_path) + + # Group consecutive messages to understand context + for i, msg in enumerate(messages): + if msg["role"] != "user": + continue + + user_text = msg["text"] + intent = classify_message_intent(user_text) + + # Find Claude's response (if any) + claude_text = "" + if i + 1 < len(messages) and messages[i + 1]["role"] == "assistant": + claude_text = messages[i + 1]["text"] + + for domain in domains: + # Detect concepts in user message + user_concepts = detect_concepts_in_text(user_text, domain) + + # Detect concepts only in Claude's response (not user's) + claude_concepts = detect_concepts_in_text(claude_text, domain) + claude_only = [c for c in claude_concepts if c not in user_concepts] + + # Score user-mentioned concepts based on intent + for concept in user_concepts: + if concept not in results[domain]: + results[domain][concept] = {"score": 0.0, "signals": []} + + if intent == "question": + results[domain][concept]["score"] -= 0.15 + results[domain][concept]["signals"].append("question") + elif intent == "assertion": + results[domain][concept]["score"] += 0.20 + results[domain][concept]["signals"].append("assertion") + else: + results[domain][concept]["score"] += 0.05 + results[domain][concept]["signals"].append("neutral_user") + + # Claude-only mentions: slight positive (user learned something) + for concept in claude_only: + if concept not in results[domain]: + results[domain][concept] = {"score": 0.0, "signals": []} + results[domain][concept]["score"] += 0.05 + results[domain][concept]["signals"].append("claude_explained") + + # Clamp all scores to [0.0, 0.8] + for domain in results: + for concept in results[domain]: + raw = results[domain][concept]["score"] + results[domain][concept]["score"] = max(0.0, min(0.8, raw)) + + return results + + +def apply_analysis_to_state(analysis: dict[str, dict[str, dict]]) -> dict: + """ + Apply analyzed scores to the knowledge state. + + Only updates nodes that have signals (detected mentions). + Returns the updated state. + """ + state = load_state() + now = datetime.now(timezone.utc).isoformat() + + for domain, concepts in analysis.items(): + if not concepts: + continue + + # Initialize domain if needed + if domain not in state["domains"]: + try: + init_domain(domain) + state = load_state() + except FileNotFoundError: + continue + + for node_id, data in concepts.items(): + if node_id not in state["domains"][domain]["nodes"]: + continue + + score = data["score"] + signal_count = len(data["signals"]) + + # Only update if we have actual signals + if signal_count > 0: + node = state["domains"][domain]["nodes"][node_id] + node["mastery"] = score + node["attempts"] = signal_count + node["last_seen"] = now + + # Compute status from mastery + if score >= 0.70: + node["status"] = "strong" + elif score >= 0.40: + node["status"] = "developing" + else: + node["status"] = "weak" + + save_state(state) + return state + + +def run_analysis( + project_path: Optional[Path] = None, + all_projects: bool = False, + domains: Optional[list[str]] = None, + max_sessions: int = 50, + dry_run: bool = False, +) -> dict: + """ + Run the full analysis pipeline. + + Args: + project_path: Specific project to analyze (default: current directory) + all_projects: Analyze all projects instead of specific one + domains: List of domains to analyze (default: all available) + max_sessions: Maximum number of sessions to analyze + dry_run: If True, don't apply changes to state + + Returns summary of analysis. + """ + # Find sessions + if all_projects: + sessions = find_all_sessions() + else: + sessions = find_project_sessions(project_path) + + if not sessions: + return { + "status": "no_sessions", + "message": "No Claude Code sessions found", + "sessions_analyzed": 0, + } + + # Determine domains + if domains is None: + domains = list_domains() + + if not domains: + return { + "status": "no_domains", + "message": "No knowledge graph domains found", + "sessions_analyzed": 0, + } + + # Run analysis + analysis = analyze_sessions(sessions, domains, max_sessions) + + # Count concepts with signals + concepts_detected = 0 + for domain in analysis: + concepts_detected += len(analysis[domain]) + + # Apply to state (unless dry run) + if not dry_run and concepts_detected > 0: + apply_analysis_to_state(analysis) + + return { + "status": "success", + "sessions_analyzed": min(len(sessions), max_sessions), + "domains_analyzed": domains, + "concepts_detected": concepts_detected, + "analysis": analysis, + "applied": not dry_run, + } + + +def format_analysis_report(result: dict) -> str: + """Format analysis result as human-readable report.""" + lines = [] + + if result["status"] == "no_sessions": + return "No Claude Code sessions found. Start some conversations first!" + + if result["status"] == "no_domains": + return "No knowledge graph domains available." + + lines.append(f"Analyzed {result['sessions_analyzed']} session(s)") + lines.append(f"Detected {result['concepts_detected']} concept(s) across {len(result['domains_analyzed'])} domain(s)") + lines.append("") + + analysis = result.get("analysis", {}) + for domain in sorted(analysis.keys()): + concepts = analysis[domain] + if not concepts: + continue + + lines.append(f"─── {domain} ───") + + # Sort by score descending + sorted_concepts = sorted( + concepts.items(), + key=lambda x: (-x[1]["score"], x[0]) + ) + + for node_id, data in sorted_concepts[:10]: # Top 10 per domain + score = data["score"] + signals = data["signals"] + + # Format score as bar + bar_len = int(score * 10) + bar = "█" * bar_len + "░" * (8 - bar_len) + + # Summarize signals + q_count = signals.count("question") + a_count = signals.count("assertion") + n_count = len(signals) - q_count - a_count + + signal_str = "" + if q_count: + signal_str += f" -{q_count}q" + if a_count: + signal_str += f" +{a_count}a" + if n_count: + signal_str += f" ~{n_count}" + + lines.append(f" {node_id:<25} [{bar}] {score:.2f}{signal_str}") + + if len(concepts) > 10: + lines.append(f" ... and {len(concepts) - 10} more") + lines.append("") + + if result.get("applied"): + lines.append("✓ Scores applied to knowledge state") + else: + lines.append("(dry run — no changes applied)") + + return "\n".join(lines) + + +if __name__ == "__main__": + # Quick test + import sys + + dry_run = "--dry-run" in sys.argv + all_projects = "--all" in sys.argv + + result = run_analysis(dry_run=dry_run, all_projects=all_projects) + print(format_analysis_report(result)) diff --git a/skill_issue/cli.py b/skill_issue/cli.py index 2feb14b..292dfd3 100644 --- a/skill_issue/cli.py +++ b/skill_issue/cli.py @@ -46,6 +46,9 @@ def cmd_init(args): ks.init_domain(domain) except Exception: pass + + # Offer retroactive bootstrap from Claude Code history + _offer_history_bootstrap(domains) else: domains = args.domains.split(",") init_profile( @@ -82,6 +85,32 @@ def cmd_init(args): print(" Other: skill-issue init --print") +def _offer_history_bootstrap(domains: list): + """Offer to bootstrap knowledge state from Claude Code session history.""" + from skill_issue.analyzer import find_all_sessions, run_analysis, format_analysis_report + + sessions = find_all_sessions() + if not sessions: + return # No history to analyze + + print(f"\n📚 Found {len(sessions)} Claude Code session(s) in your history.") + print(" We can analyze these to set your initial mastery levels.") + print(" (Questions you asked → weakness, code you wrote → strength)\n") + + try: + response = input(" Analyze history to bootstrap your knowledge graph? [Y/n] ").strip().lower() + except (EOFError, KeyboardInterrupt): + print("\n Skipped.") + return + + if response in ("", "y", "yes"): + print("\n🔍 Analyzing sessions...\n") + result = run_analysis(all_projects=True, domains=domains) + print(format_analysis_report(result)) + else: + print(" Skipped. Run `skill-issue analyze` anytime to bootstrap later.") + + def _inject_into_file(filename: str, skill_md_path: Path): """Append skill-issue activation block to a config file.""" target = Path.cwd() / filename @@ -273,6 +302,21 @@ def cmd_graph_domains(args): print_available_domains() +def cmd_analyze(args): + """Analyze Claude Code session history to bootstrap knowledge state.""" + from skill_issue.analyzer import run_analysis, format_analysis_report + + print("\n🔍 Analyzing Claude Code session history...\n") + + result = run_analysis( + all_projects=args.all, + max_sessions=args.max_sessions, + dry_run=args.dry_run, + ) + + print(format_analysis_report(result)) + + def main(): parser = argparse.ArgumentParser( prog="skill-issue", @@ -358,6 +402,16 @@ def main(): p_graph_domains = graph_sub.add_parser("domains", help="List available domains") p_graph_domains.set_defaults(func=cmd_graph_domains) + # analyze + p_analyze = sub.add_parser("analyze", help="Bootstrap knowledge from Claude Code history") + p_analyze.add_argument("--all", action="store_true", + help="Analyze all projects (default: current directory only)") + p_analyze.add_argument("--max-sessions", type=int, default=50, + help="Max sessions to analyze (default: 50)") + p_analyze.add_argument("--dry-run", action="store_true", + help="Show analysis without applying changes") + p_analyze.set_defaults(func=cmd_analyze) + args = parser.parse_args() if not args.command: diff --git a/tests/test_analyzer.py b/tests/test_analyzer.py new file mode 100644 index 0000000..e1217a2 --- /dev/null +++ b/tests/test_analyzer.py @@ -0,0 +1,406 @@ +""" +Tests for the retroactive bootstrap analyzer. +""" + +import json +import uuid +from datetime import datetime, timezone +from pathlib import Path + +import pytest + +from skill_issue import analyzer +from skill_issue import knowledge_state + + +@pytest.fixture +def isolated_env(monkeypatch, tmp_path): + """Set up isolated test environment.""" + # Isolated skill-issue directory + skill_dir = tmp_path / ".skill-issue" + skill_dir.mkdir() + graphs_dir = tmp_path / "knowledge_graphs" + graphs_dir.mkdir() + + # Isolated Claude projects directory + claude_dir = tmp_path / ".claude" / "projects" + claude_dir.mkdir(parents=True) + + # Patch knowledge_state paths + monkeypatch.setattr(knowledge_state, "SKILL_DIR", skill_dir) + monkeypatch.setattr(knowledge_state, "KNOWLEDGE_STATE_PATH", skill_dir / "knowledge_state.json") + monkeypatch.setattr(knowledge_state, "GRAPHS_DIR", graphs_dir) + + # Patch analyzer paths + monkeypatch.setattr(analyzer, "CLAUDE_PROJECTS_DIR", claude_dir) + + return { + "skill_dir": skill_dir, + "graphs_dir": graphs_dir, + "claude_dir": claude_dir, + } + + +def _write_graph(graphs_dir: Path, domain: str, graph: dict): + """Write a test graph to the graphs directory.""" + graph_path = graphs_dir / f"{domain}.json" + graph_path.write_text(json.dumps(graph)) + + +def _unique_id(): + """Generate unique ID for test isolation.""" + return uuid.uuid4().hex[:8] + + +def _create_session(claude_dir: Path, project_name: str, messages: list[dict]) -> Path: + """ + Create a mock session JSONL file. + + messages: list of {"role": "user"|"assistant", "text": str} + """ + project_dir = claude_dir / project_name + project_dir.mkdir(exist_ok=True) + + session_id = str(uuid.uuid4()) + session_path = project_dir / f"{session_id}.jsonl" + + lines = [] + timestamp = datetime.now(timezone.utc).isoformat() + + for msg in messages: + obj = { + "type": msg["role"], + "timestamp": timestamp, + "message": { + "content": msg["text"] + } + } + lines.append(json.dumps(obj)) + + session_path.write_text("\n".join(lines)) + return session_path + + +class TestClassifyMessageIntent: + """Tests for classify_message_intent function.""" + + def test_question_with_what_is(self): + assert analyzer.classify_message_intent("What is gradient descent?") == "question" + + def test_question_with_how_does(self): + assert analyzer.classify_message_intent("How does backpropagation work?") == "question" + + def test_question_with_question_mark(self): + assert analyzer.classify_message_intent("Can you help me with this?") == "question" + + def test_question_with_confusion(self): + assert analyzer.classify_message_intent("I don't understand regularization") == "question" + + def test_assertion_with_code_block(self): + text = "Here's my implementation:\n```python\ndef train(): pass\n```" + assert analyzer.classify_message_intent(text) == "assertion" + + def test_assertion_with_i_implemented(self): + assert analyzer.classify_message_intent("I implemented the loss function") == "assertion" + + def test_neutral_simple_statement(self): + assert analyzer.classify_message_intent("Let's work on this next") == "neutral" + + +class TestDetectConceptsInText: + """Tests for detect_concepts_in_text function.""" + + def test_detects_node_name(self, isolated_env): + domain = f"detect-test-{_unique_id()}" + graph = { + "nodes": [ + {"id": "gradient-descent", "name": "Gradient Descent", "aliases": ["SGD"]} + ] + } + _write_graph(isolated_env["graphs_dir"], domain, graph) + + detected = analyzer.detect_concepts_in_text("I'm learning about gradient descent", domain) + assert "gradient-descent" in detected + + def test_detects_alias(self, isolated_env): + domain = f"alias-test-{_unique_id()}" + graph = { + "nodes": [ + {"id": "gradient-descent", "name": "Gradient Descent", "aliases": ["SGD", "optimizer"]} + ] + } + _write_graph(isolated_env["graphs_dir"], domain, graph) + + detected = analyzer.detect_concepts_in_text("I configured the SGD optimizer", domain) + assert "gradient-descent" in detected + + def test_detects_kebab_case_id(self, isolated_env): + domain = f"kebab-test-{_unique_id()}" + graph = { + "nodes": [ + {"id": "bias-variance-tradeoff", "name": "Bias-Variance Tradeoff", "aliases": []} + ] + } + _write_graph(isolated_env["graphs_dir"], domain, graph) + + detected = analyzer.detect_concepts_in_text("the bias variance tradeoff is important", domain) + assert "bias-variance-tradeoff" in detected + + def test_no_false_positives(self, isolated_env): + domain = f"fp-test-{_unique_id()}" + graph = { + "nodes": [ + {"id": "transformer", "name": "Transformer", "aliases": ["BERT"]} + ] + } + _write_graph(isolated_env["graphs_dir"], domain, graph) + + detected = analyzer.detect_concepts_in_text("I transformed the data using pandas", domain) + # "transform" should not match "transformer" unless the full word matches + assert detected == [] # pandas transform != transformer + + +class TestAnalyzeSessions: + """Tests for analyze_sessions function.""" + + def test_question_decreases_score(self, isolated_env): + domain = f"analyze-q-{_unique_id()}" + graph = { + "nodes": [ + {"id": "backpropagation", "name": "Backpropagation", "aliases": ["backprop"]} + ] + } + _write_graph(isolated_env["graphs_dir"], domain, graph) + + # Create session with user asking about backprop + session = _create_session( + isolated_env["claude_dir"], + "-test-project", + [ + {"role": "user", "text": "What is backpropagation?"}, + {"role": "assistant", "text": "Backpropagation is the chain rule applied to neural networks..."}, + ] + ) + + result = analyzer.analyze_sessions([session], [domain]) + + assert "backpropagation" in result[domain] + # Score clamped to 0.0 minimum (question signal was -0.15, clamped up) + assert result[domain]["backpropagation"]["score"] == 0.0 + assert "question" in result[domain]["backpropagation"]["signals"] + + def test_assertion_increases_score(self, isolated_env): + domain = f"analyze-a-{_unique_id()}" + graph = { + "nodes": [ + {"id": "loss-functions", "name": "Loss Functions", "aliases": ["cross-entropy", "MSE"]} + ] + } + _write_graph(isolated_env["graphs_dir"], domain, graph) + + # Create session with user showing code + session = _create_session( + isolated_env["claude_dir"], + "-test-project", + [ + {"role": "user", "text": "I implemented the cross-entropy loss function:\n```python\ndef loss(): pass\n```"}, + {"role": "assistant", "text": "Looks good!"}, + ] + ) + + result = analyzer.analyze_sessions([session], [domain]) + + assert "loss-functions" in result[domain] + assert result[domain]["loss-functions"]["score"] > 0.0 + assert "assertion" in result[domain]["loss-functions"]["signals"] + + def test_claude_only_mention_slight_positive(self, isolated_env): + domain = f"analyze-c-{_unique_id()}" + graph = { + "nodes": [ + {"id": "regularization", "name": "Regularization", "aliases": ["L1", "L2"]} + ] + } + _write_graph(isolated_env["graphs_dir"], domain, graph) + + # Claude mentions regularization but user doesn't + session = _create_session( + isolated_env["claude_dir"], + "-test-project", + [ + {"role": "user", "text": "How can I prevent overfitting?"}, + {"role": "assistant", "text": "You can use regularization like L1 or L2 penalties."}, + ] + ) + + result = analyzer.analyze_sessions([session], [domain]) + + assert "regularization" in result[domain] + assert result[domain]["regularization"]["score"] > 0.0 + assert "claude_explained" in result[domain]["regularization"]["signals"] + + def test_score_clamped_to_max(self, isolated_env): + domain = f"clamp-test-{_unique_id()}" + graph = { + "nodes": [ + {"id": "attention", "name": "Attention", "aliases": []} + ] + } + _write_graph(isolated_env["graphs_dir"], domain, graph) + + # Many assertion signals + messages = [] + for i in range(10): + messages.append({"role": "user", "text": f"I built attention mechanism {i}:\n```code\n```"}) + messages.append({"role": "assistant", "text": "Great!"}) + + session = _create_session(isolated_env["claude_dir"], "-test-project", messages) + result = analyzer.analyze_sessions([session], [domain]) + + assert result[domain]["attention"]["score"] <= 0.8 + + +class TestApplyAnalysisToState: + """Tests for apply_analysis_to_state function.""" + + def test_updates_node_mastery(self, isolated_env): + domain = f"apply-test-{_unique_id()}" + graph = { + "nodes": [ + {"id": "test-node", "name": "Test Node", "aliases": [], "reuse_weight": 0.5} + ] + } + _write_graph(isolated_env["graphs_dir"], domain, graph) + + # Initialize domain + knowledge_state.init_domain(domain) + + # Apply analysis + analysis = { + domain: { + "test-node": { + "score": 0.6, + "signals": ["assertion", "assertion", "neutral_user"] + } + } + } + + analyzer.apply_analysis_to_state(analysis) + + # Check state was updated + state = knowledge_state.load_state() + node = state["domains"][domain]["nodes"]["test-node"] + assert node["mastery"] == 0.6 + assert node["attempts"] == 3 + assert node["status"] == "developing" + + def test_skips_nodes_without_signals(self, isolated_env): + domain = f"skip-test-{_unique_id()}" + graph = { + "nodes": [ + {"id": "test-node", "name": "Test Node", "aliases": [], "reuse_weight": 0.5} + ] + } + _write_graph(isolated_env["graphs_dir"], domain, graph) + + knowledge_state.init_domain(domain) + + # Analysis with empty signals + analysis = { + domain: { + "test-node": { + "score": 0.5, + "signals": [] # No signals + } + } + } + + analyzer.apply_analysis_to_state(analysis) + + # Node should not be updated (still at initial 0.0) + state = knowledge_state.load_state() + node = state["domains"][domain]["nodes"]["test-node"] + assert node["mastery"] == 0.0 + + +class TestFindSessions: + """Tests for session discovery functions.""" + + def test_find_project_sessions(self, isolated_env, monkeypatch): + # Create a mock project directory + project_path = Path("/Users/test/my-project") + mangled = "-Users-test-my-project" + project_dir = isolated_env["claude_dir"] / mangled + project_dir.mkdir() + + # Create some session files + (project_dir / "session1.jsonl").write_text("{}") + (project_dir / "session2.jsonl").write_text("{}") + + sessions = analyzer.find_project_sessions(project_path) + assert len(sessions) == 2 + + def test_find_all_sessions(self, isolated_env): + # Create sessions in multiple projects + for proj in ["proj1", "proj2"]: + proj_dir = isolated_env["claude_dir"] / proj + proj_dir.mkdir() + (proj_dir / "session.jsonl").write_text("{}") + + sessions = analyzer.find_all_sessions() + assert len(sessions) == 2 + + +class TestExtractMessages: + """Tests for extract_messages function.""" + + def test_extracts_string_content(self, isolated_env): + session = _create_session( + isolated_env["claude_dir"], + "-test", + [{"role": "user", "text": "Hello world"}] + ) + + messages = analyzer.extract_messages(session) + assert len(messages) == 1 + assert messages[0]["text"] == "Hello world" + assert messages[0]["role"] == "user" + + def test_handles_list_content(self, isolated_env): + # Create session with list-format content + project_dir = isolated_env["claude_dir"] / "-test-list" + project_dir.mkdir() + session_path = project_dir / "session.jsonl" + + obj = { + "type": "user", + "timestamp": datetime.now(timezone.utc).isoformat(), + "message": { + "content": [ + {"type": "text", "text": "First part"}, + {"type": "text", "text": "Second part"}, + ] + } + } + session_path.write_text(json.dumps(obj)) + + messages = analyzer.extract_messages(session_path) + assert len(messages) == 1 + assert "First part" in messages[0]["text"] + assert "Second part" in messages[0]["text"] + + def test_skips_non_message_entries(self, isolated_env): + project_dir = isolated_env["claude_dir"] / "-test-skip" + project_dir.mkdir() + session_path = project_dir / "session.jsonl" + + lines = [ + json.dumps({"type": "progress", "data": {}}), + json.dumps({"type": "user", "message": {"content": "Real message"}}), + json.dumps({"type": "queue-operation", "operation": "dequeue"}), + ] + session_path.write_text("\n".join(lines)) + + messages = analyzer.extract_messages(session_path) + assert len(messages) == 1 + assert messages[0]["text"] == "Real message"