From c90202c89258354ddf8d3059a867f36c7b4187fd Mon Sep 17 00:00:00 2001 From: agi-worker Date: Sat, 4 Apr 2026 17:38:30 +0800 Subject: [PATCH] test: add unit tests for TodoManager, SkillLoader, micro_compact, and TaskManager Add 55 unit tests covering core harness components: - TodoManager (s03): validation, defaults, render, boundary conditions - SkillLoader (s05): frontmatter parsing, skill loading, descriptions - micro_compact/estimate_tokens (s06): compression logic, preservation rules - TaskManager (s07): CRUD, dependency resolution, ID continuity Also adds shared conftest.py with mocked anthropic/dotenv module loader, and updates CI to run full test suite with pyyaml dependency. --- .github/workflows/test.yml | 6 +- tests/conftest.py | 67 ++++++++++++ tests/test_context_compact.py | 124 +++++++++++++++++++++++ tests/test_skill_loader.py | 121 ++++++++++++++++++++++ tests/test_task_manager.py | 186 ++++++++++++++++++++++++++++++++++ tests/test_todo_manager.py | 126 +++++++++++++++++++++++ 6 files changed, 627 insertions(+), 3 deletions(-) create mode 100644 tests/conftest.py create mode 100644 tests/test_context_compact.py create mode 100644 tests/test_skill_loader.py create mode 100644 tests/test_task_manager.py create mode 100644 tests/test_todo_manager.py diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index c6f70e1c0..001866229 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -18,10 +18,10 @@ jobs: python-version: "3.11" - name: Install dependencies - run: pip install anthropic python-dotenv pytest + run: pip install anthropic python-dotenv pyyaml pytest - - name: Run Python smoke tests - run: python -m pytest tests/test_agents_smoke.py -q + - name: Run Python tests + run: python -m pytest tests/ -q web-build: runs-on: ubuntu-latest diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 000000000..2bd52049b --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,67 @@ +"""Shared fixtures for loading agent modules with mocked dependencies.""" +from __future__ import annotations + +import importlib.util +import os +import sys +import types +from pathlib import Path + +REPO_ROOT = Path(__file__).resolve().parents[1] +AGENTS_DIR = REPO_ROOT / "agents" + + +def load_agent_module(module_file: str, temp_cwd: Path): + """Load an agent module with mocked anthropic/dotenv so it doesn't need real API keys. + + Args: + module_file: filename inside agents/, e.g. "s03_todo_write.py" + temp_cwd: temporary working directory (avoids polluting real filesystem) + + Returns: + The loaded module object. + """ + module_path = AGENTS_DIR / module_file + + # Fake anthropic module + fake_anthropic = types.ModuleType("anthropic") + + class FakeAnthropic: + def __init__(self, *args, **kwargs): + self.messages = types.SimpleNamespace(create=None) + + setattr(fake_anthropic, "Anthropic", FakeAnthropic) + + # Fake dotenv module + fake_dotenv = types.ModuleType("dotenv") + setattr(fake_dotenv, "load_dotenv", lambda **kw: None) + + # Save originals + prev_anthropic = sys.modules.get("anthropic") + prev_dotenv = sys.modules.get("dotenv") + prev_cwd = Path.cwd() + + spec = importlib.util.spec_from_file_location( + f"agent_{module_file.replace('.py', '')}", module_path + ) + if spec is None or spec.loader is None: + raise RuntimeError(f"Unable to load {module_path}") + module = importlib.util.module_from_spec(spec) + + sys.modules["anthropic"] = fake_anthropic + sys.modules["dotenv"] = fake_dotenv + try: + os.chdir(temp_cwd) + os.environ.setdefault("MODEL_ID", "test-model") + spec.loader.exec_module(module) + return module + finally: + os.chdir(prev_cwd) + if prev_anthropic is None: + sys.modules.pop("anthropic", None) + else: + sys.modules["anthropic"] = prev_anthropic + if prev_dotenv is None: + sys.modules.pop("dotenv", None) + else: + sys.modules["dotenv"] = prev_dotenv diff --git a/tests/test_context_compact.py b/tests/test_context_compact.py new file mode 100644 index 000000000..7b17d6e4e --- /dev/null +++ b/tests/test_context_compact.py @@ -0,0 +1,124 @@ +"""Unit tests for micro_compact and estimate_tokens (s06_context_compact.py).""" +from __future__ import annotations + +import tempfile +import types +from pathlib import Path + +import pytest + +from conftest import load_agent_module + + +@pytest.fixture() +def compact_module(): + with tempfile.TemporaryDirectory() as tmp: + module = load_agent_module("s06_context_compact.py", Path(tmp)) + yield module + + +# -- estimate_tokens -- + +class TestEstimateTokens: + def test_basic(self, compact_module): + msgs = [{"role": "user", "content": "a" * 400}] + result = compact_module.estimate_tokens(msgs) + assert result == len(str(msgs)) // 4 + + def test_empty_messages(self, compact_module): + assert compact_module.estimate_tokens([]) == len(str([])) // 4 + + +# -- micro_compact -- + +def _tool_use_block(tool_id: str, name: str): + """Create a mock tool_use block (SimpleNamespace mimicking Anthropic SDK object).""" + return types.SimpleNamespace(type="tool_use", id=tool_id, name=name, input={}) + + +def _tool_result(tool_id: str, content: str): + """Create a tool_result dict.""" + return {"type": "tool_result", "tool_use_id": tool_id, "content": content} + + +def _make_messages(n_results: int, tool_name: str = "bash"): + """Build a message list with n tool_use/tool_result pairs. + + Each pair is: assistant message with tool_use block, then user message with tool_result. + """ + messages = [] + for i in range(n_results): + tid = f"tool_{i}" + messages.append({ + "role": "assistant", + "content": [_tool_use_block(tid, tool_name)], + }) + messages.append({ + "role": "user", + "content": [_tool_result(tid, f"Output line {'x' * 200} for call {i}")], + }) + return messages + + +class TestMicroCompact: + def test_few_results_unchanged(self, compact_module): + """With <= KEEP_RECENT results, nothing is compacted.""" + messages = _make_messages(3) # exactly KEEP_RECENT + original_contents = [ + messages[i]["content"][0]["content"] + for i in range(1, len(messages), 2) + ] + compact_module.micro_compact(messages) + for idx, i in enumerate(range(1, len(messages), 2)): + assert messages[i]["content"][0]["content"] == original_contents[idx] + + def test_clears_old_results(self, compact_module): + """With > KEEP_RECENT results, oldest are replaced with placeholder.""" + messages = _make_messages(5) + compact_module.micro_compact(messages) + # First 2 results (index 0,1) should be compacted + assert messages[1]["content"][0]["content"] == "[Previous: used bash]" + assert messages[3]["content"][0]["content"] == "[Previous: used bash]" + # Last 3 results should be preserved + assert messages[5]["content"][0]["content"].startswith("Output line") + assert messages[7]["content"][0]["content"].startswith("Output line") + assert messages[9]["content"][0]["content"].startswith("Output line") + + def test_preserves_read_file(self, compact_module): + """read_file results are never compacted.""" + messages = _make_messages(5, tool_name="read_file") + compact_module.micro_compact(messages) + # All should be preserved since tool_name is read_file + for i in range(1, len(messages), 2): + assert messages[i]["content"][0]["content"].startswith("Output line") + + def test_skips_short_content(self, compact_module): + """Content <= 100 chars is not compacted.""" + messages = [] + for i in range(5): + tid = f"tool_{i}" + messages.append({ + "role": "assistant", + "content": [_tool_use_block(tid, "bash")], + }) + messages.append({ + "role": "user", + "content": [_tool_result(tid, "short")], # <= 100 chars + }) + compact_module.micro_compact(messages) + # All should be preserved because content is short + for i in range(1, len(messages), 2): + assert messages[i]["content"][0]["content"] == "short" + + def test_unknown_tool_name(self, compact_module): + """When tool_use_id has no matching tool_use block, uses 'unknown'.""" + messages = [ + # No assistant message with matching tool_use + {"role": "user", "content": [_tool_result("orphan_0", "x" * 200)]}, + {"role": "user", "content": [_tool_result("orphan_1", "x" * 200)]}, + {"role": "user", "content": [_tool_result("orphan_2", "x" * 200)]}, + {"role": "user", "content": [_tool_result("orphan_3", "x" * 200)]}, + ] + compact_module.micro_compact(messages) + # First result should be compacted with "unknown" + assert messages[0]["content"][0]["content"] == "[Previous: used unknown]" diff --git a/tests/test_skill_loader.py b/tests/test_skill_loader.py new file mode 100644 index 000000000..825fb0e73 --- /dev/null +++ b/tests/test_skill_loader.py @@ -0,0 +1,121 @@ +"""Unit tests for SkillLoader (s05_skill_loading.py).""" +from __future__ import annotations + +import tempfile +from pathlib import Path + +import pytest + +from conftest import load_agent_module + + +@pytest.fixture() +def SkillLoader(): + with tempfile.TemporaryDirectory() as tmp: + module = load_agent_module("s05_skill_loading.py", Path(tmp)) + yield module.SkillLoader + + +def _make_skill(base_dir: Path, name: str, frontmatter: str, body: str): + """Helper: create a skills//SKILL.md file.""" + skill_dir = base_dir / name + skill_dir.mkdir(parents=True, exist_ok=True) + (skill_dir / "SKILL.md").write_text(f"---\n{frontmatter}\n---\n{body}") + + +# -- _parse_frontmatter -- + +class TestParseFrontmatter: + def test_valid_frontmatter(self, SkillLoader): + loader = SkillLoader(Path("/nonexistent")) + meta, body = loader._parse_frontmatter("---\nname: test\ndescription: A test\n---\nBody text here") + assert meta["name"] == "test" + assert meta["description"] == "A test" + assert body == "Body text here" + + def test_no_frontmatter(self, SkillLoader): + loader = SkillLoader(Path("/nonexistent")) + meta, body = loader._parse_frontmatter("Just plain text without frontmatter") + assert meta == {} + assert body == "Just plain text without frontmatter" + + def test_invalid_yaml(self, SkillLoader): + loader = SkillLoader(Path("/nonexistent")) + meta, body = loader._parse_frontmatter("---\n: [invalid yaml\n---\nBody") + assert meta == {} + assert body == "Body" + + +# -- _load_all -- + +class TestLoadAll: + def test_nonexistent_dir(self, SkillLoader): + loader = SkillLoader(Path("/does/not/exist")) + assert loader.skills == {} + + def test_empty_dir(self, SkillLoader, tmp_path): + loader = SkillLoader(tmp_path) + assert loader.skills == {} + + def test_loads_skill(self, SkillLoader, tmp_path): + _make_skill(tmp_path, "pdf", "name: pdf\ndescription: PDF tools", "Process PDFs here") + loader = SkillLoader(tmp_path) + assert "pdf" in loader.skills + assert loader.skills["pdf"]["body"] == "Process PDFs here" + + def test_name_from_directory(self, SkillLoader, tmp_path): + """When frontmatter has no 'name', directory name is used.""" + _make_skill(tmp_path, "my-tool", "description: A tool", "Body") + loader = SkillLoader(tmp_path) + assert "my-tool" in loader.skills + + def test_multiple_skills(self, SkillLoader, tmp_path): + _make_skill(tmp_path, "a", "name: a\ndescription: Skill A", "Body A") + _make_skill(tmp_path, "b", "name: b\ndescription: Skill B", "Body B") + loader = SkillLoader(tmp_path) + assert len(loader.skills) == 2 + + +# -- get_descriptions -- + +class TestGetDescriptions: + def test_no_skills(self, SkillLoader): + loader = SkillLoader(Path("/nonexistent")) + assert loader.get_descriptions() == "(no skills available)" + + def test_with_description(self, SkillLoader, tmp_path): + _make_skill(tmp_path, "pdf", "name: pdf\ndescription: Process PDFs", "Body") + loader = SkillLoader(tmp_path) + desc = loader.get_descriptions() + assert "pdf: Process PDFs" in desc + + def test_with_tags(self, SkillLoader, tmp_path): + _make_skill(tmp_path, "pdf", "name: pdf\ndescription: Process PDFs\ntags: utils", "Body") + loader = SkillLoader(tmp_path) + desc = loader.get_descriptions() + assert "[utils]" in desc + + def test_without_tags(self, SkillLoader, tmp_path): + _make_skill(tmp_path, "pdf", "name: pdf\ndescription: Process PDFs", "Body") + loader = SkillLoader(tmp_path) + desc = loader.get_descriptions() + assert "[" not in desc + + +# -- get_content -- + +class TestGetContent: + def test_existing_skill(self, SkillLoader, tmp_path): + _make_skill(tmp_path, "pdf", "name: pdf\ndescription: PDF", "PDF instructions") + loader = SkillLoader(tmp_path) + content = loader.get_content("pdf") + assert '' in content + assert "PDF instructions" in content + assert "" in content + + def test_unknown_skill(self, SkillLoader, tmp_path): + _make_skill(tmp_path, "pdf", "name: pdf\ndescription: PDF", "Body") + loader = SkillLoader(tmp_path) + result = loader.get_content("unknown") + assert "Error: Unknown skill 'unknown'" in result + assert "pdf" in result # lists available skills diff --git a/tests/test_task_manager.py b/tests/test_task_manager.py new file mode 100644 index 000000000..a276291cb --- /dev/null +++ b/tests/test_task_manager.py @@ -0,0 +1,186 @@ +"""Unit tests for TaskManager (s07_task_system.py).""" +from __future__ import annotations + +import json +import tempfile +from pathlib import Path + +import pytest + +from conftest import load_agent_module + + +@pytest.fixture() +def TaskManager(): + with tempfile.TemporaryDirectory() as tmp: + module = load_agent_module("s07_task_system.py", Path(tmp)) + yield module.TaskManager + + +# -- create -- + +class TestCreate: + def test_creates_task_with_defaults(self, TaskManager, tmp_path): + tm = TaskManager(tmp_path / "tasks") + result = json.loads(tm.create("Build feature")) + assert result["id"] == 1 + assert result["subject"] == "Build feature" + assert result["description"] == "" + assert result["status"] == "pending" + assert result["blockedBy"] == [] + assert result["owner"] == "" + + def test_sequential_ids(self, TaskManager, tmp_path): + tm = TaskManager(tmp_path / "tasks") + t1 = json.loads(tm.create("First")) + t2 = json.loads(tm.create("Second")) + t3 = json.loads(tm.create("Third")) + assert t1["id"] == 1 + assert t2["id"] == 2 + assert t3["id"] == 3 + + def test_persists_to_file(self, TaskManager, tmp_path): + tasks_dir = tmp_path / "tasks" + tm = TaskManager(tasks_dir) + tm.create("Persisted task") + assert (tasks_dir / "task_1.json").exists() + data = json.loads((tasks_dir / "task_1.json").read_text()) + assert data["subject"] == "Persisted task" + + def test_with_description(self, TaskManager, tmp_path): + tm = TaskManager(tmp_path / "tasks") + result = json.loads(tm.create("Task", "Detailed description")) + assert result["description"] == "Detailed description" + + +# -- get -- + +class TestGet: + def test_get_existing(self, TaskManager, tmp_path): + tm = TaskManager(tmp_path / "tasks") + tm.create("My task") + result = json.loads(tm.get(1)) + assert result["subject"] == "My task" + + def test_get_nonexistent_raises(self, TaskManager, tmp_path): + tm = TaskManager(tmp_path / "tasks") + with pytest.raises(ValueError, match="Task 999 not found"): + tm.get(999) + + +# -- update -- + +class TestUpdate: + def test_update_status(self, TaskManager, tmp_path): + tm = TaskManager(tmp_path / "tasks") + tm.create("Task") + result = json.loads(tm.update(1, status="in_progress")) + assert result["status"] == "in_progress" + + def test_invalid_status_raises(self, TaskManager, tmp_path): + tm = TaskManager(tmp_path / "tasks") + tm.create("Task") + with pytest.raises(ValueError, match="Invalid status"): + tm.update(1, status="done") + + def test_add_blocked_by(self, TaskManager, tmp_path): + tm = TaskManager(tmp_path / "tasks") + tm.create("First") + tm.create("Second") + result = json.loads(tm.update(2, add_blocked_by=[1])) + assert 1 in result["blockedBy"] + + def test_add_blocked_by_deduplicates(self, TaskManager, tmp_path): + tm = TaskManager(tmp_path / "tasks") + tm.create("First") + tm.create("Second") + tm.update(2, add_blocked_by=[1]) + result = json.loads(tm.update(2, add_blocked_by=[1])) + assert result["blockedBy"].count(1) == 1 + + def test_remove_blocked_by(self, TaskManager, tmp_path): + tm = TaskManager(tmp_path / "tasks") + tm.create("First") + tm.create("Second") + tm.update(2, add_blocked_by=[1]) + result = json.loads(tm.update(2, remove_blocked_by=[1])) + assert 1 not in result["blockedBy"] + + def test_remove_nonexistent_dependency_ok(self, TaskManager, tmp_path): + tm = TaskManager(tmp_path / "tasks") + tm.create("Task") + result = json.loads(tm.update(1, remove_blocked_by=[999])) + assert result["blockedBy"] == [] + + +# -- _clear_dependency -- + +class TestClearDependency: + def test_completing_clears_from_dependents(self, TaskManager, tmp_path): + tm = TaskManager(tmp_path / "tasks") + tm.create("Blocker") + tm.create("Blocked") + tm.update(2, add_blocked_by=[1]) + # Complete task 1 -> should remove 1 from task 2's blockedBy + tm.update(1, status="completed") + task2 = json.loads(tm.get(2)) + assert 1 not in task2["blockedBy"] + + def test_completing_clears_from_multiple_dependents(self, TaskManager, tmp_path): + tm = TaskManager(tmp_path / "tasks") + tm.create("Blocker") + tm.create("Dep A") + tm.create("Dep B") + tm.update(2, add_blocked_by=[1]) + tm.update(3, add_blocked_by=[1]) + tm.update(1, status="completed") + assert 1 not in json.loads(tm.get(2))["blockedBy"] + assert 1 not in json.loads(tm.get(3))["blockedBy"] + + +# -- list_all -- + +class TestListAll: + def test_empty(self, TaskManager, tmp_path): + tm = TaskManager(tmp_path / "tasks") + assert tm.list_all() == "No tasks." + + def test_with_items(self, TaskManager, tmp_path): + tm = TaskManager(tmp_path / "tasks") + tm.create("Alpha") + tm.create("Beta") + tm.update(1, status="completed") + output = tm.list_all() + assert "[x] #1: Alpha" in output + assert "[ ] #2: Beta" in output + + def test_shows_blocked_info(self, TaskManager, tmp_path): + tm = TaskManager(tmp_path / "tasks") + tm.create("First") + tm.create("Second") + tm.update(2, add_blocked_by=[1]) + output = tm.list_all() + assert "blocked by:" in output + + def test_sorted_by_id(self, TaskManager, tmp_path): + tm = TaskManager(tmp_path / "tasks") + tm.create("Third") + tm.create("First") + output = tm.list_all() + lines = [l for l in output.splitlines() if l.strip()] + assert "#1" in lines[0] + assert "#2" in lines[1] + + +# -- ID continuity across instances -- + +class TestIdContinuity: + def test_new_instance_resumes_ids(self, TaskManager, tmp_path): + tasks_dir = tmp_path / "tasks" + tm1 = TaskManager(tasks_dir) + tm1.create("Task A") + tm1.create("Task B") + # New instance on same directory + tm2 = TaskManager(tasks_dir) + result = json.loads(tm2.create("Task C")) + assert result["id"] == 3 diff --git a/tests/test_todo_manager.py b/tests/test_todo_manager.py new file mode 100644 index 000000000..4901d57de --- /dev/null +++ b/tests/test_todo_manager.py @@ -0,0 +1,126 @@ +"""Unit tests for TodoManager (s03_todo_write.py).""" +from __future__ import annotations + +import tempfile +from pathlib import Path + +import pytest + +from conftest import load_agent_module + + +@pytest.fixture() +def TodoManager(): + with tempfile.TemporaryDirectory() as tmp: + module = load_agent_module("s03_todo_write.py", Path(tmp)) + yield module.TodoManager + + +# -- render() -- + +class TestRender: + def test_empty_renders_no_todos(self, TodoManager): + tm = TodoManager() + assert tm.render() == "No todos." + + def test_render_markers(self, TodoManager): + tm = TodoManager() + tm.update([ + {"id": "1", "text": "Plan", "status": "completed"}, + {"id": "2", "text": "Build", "status": "in_progress"}, + {"id": "3", "text": "Test", "status": "pending"}, + ]) + output = tm.render() + assert "[x] #1: Plan" in output + assert "[>] #2: Build" in output + assert "[ ] #3: Test" in output + + def test_render_completion_count(self, TodoManager): + tm = TodoManager() + tm.update([ + {"id": "1", "text": "Done", "status": "completed"}, + {"id": "2", "text": "Todo", "status": "pending"}, + ]) + assert "(1/2 completed)" in tm.render() + + +# -- update() happy path -- + +class TestUpdateHappyPath: + def test_single_item(self, TodoManager): + tm = TodoManager() + result = tm.update([{"id": "1", "text": "Hello", "status": "pending"}]) + assert "[ ] #1: Hello" in result + assert len(tm.items) == 1 + + def test_max_20_items(self, TodoManager): + tm = TodoManager() + items = [{"id": str(i), "text": f"Task {i}", "status": "pending"} for i in range(1, 21)] + result = tm.update(items) + assert "(0/20 completed)" in result + + def test_one_in_progress_allowed(self, TodoManager): + tm = TodoManager() + result = tm.update([ + {"id": "1", "text": "A", "status": "in_progress"}, + {"id": "2", "text": "B", "status": "pending"}, + ]) + assert "[>] #1: A" in result + + +# -- update() defaults -- + +class TestUpdateDefaults: + def test_default_status_is_pending(self, TodoManager): + tm = TodoManager() + tm.update([{"id": "1", "text": "Foo"}]) + assert tm.items[0]["status"] == "pending" + + def test_default_id_from_index(self, TodoManager): + tm = TodoManager() + tm.update([{"text": "First"}, {"text": "Second"}]) + assert tm.items[0]["id"] == "1" + assert tm.items[1]["id"] == "2" + + def test_status_case_insensitive(self, TodoManager): + tm = TodoManager() + tm.update([{"id": "1", "text": "Foo", "status": "PENDING"}]) + assert tm.items[0]["status"] == "pending" + + def test_text_stripped(self, TodoManager): + tm = TodoManager() + tm.update([{"id": "1", "text": " spaced ", "status": "pending"}]) + assert tm.items[0]["text"] == "spaced" + + +# -- update() validation errors -- + +class TestUpdateValidation: + def test_over_20_raises(self, TodoManager): + tm = TodoManager() + items = [{"id": str(i), "text": f"T{i}", "status": "pending"} for i in range(21)] + with pytest.raises(ValueError, match="Max 20"): + tm.update(items) + + def test_empty_text_raises(self, TodoManager): + tm = TodoManager() + with pytest.raises(ValueError, match="text required"): + tm.update([{"id": "1", "text": "", "status": "pending"}]) + + def test_whitespace_only_text_raises(self, TodoManager): + tm = TodoManager() + with pytest.raises(ValueError, match="text required"): + tm.update([{"id": "1", "text": " ", "status": "pending"}]) + + def test_invalid_status_raises(self, TodoManager): + tm = TodoManager() + with pytest.raises(ValueError, match="invalid status"): + tm.update([{"id": "1", "text": "Foo", "status": "done"}]) + + def test_two_in_progress_raises(self, TodoManager): + tm = TodoManager() + with pytest.raises(ValueError, match="Only one task"): + tm.update([ + {"id": "1", "text": "A", "status": "in_progress"}, + {"id": "2", "text": "B", "status": "in_progress"}, + ])