diff --git a/.gitignore b/.gitignore
index 71c7af4b..9b4e81f5 100644
--- a/.gitignore
+++ b/.gitignore
@@ -11,3 +11,5 @@ coverage/
.worktrees/
homepage/public/demo/
.private/
+__pycache__/
+*.pyc
diff --git a/understand-anything-plugin/agents/file-analyzer.md b/understand-anything-plugin/agents/file-analyzer.md
index 38895267..d86c540d 100644
--- a/understand-anything-plugin/agents/file-analyzer.md
+++ b/understand-anything-plugin/agents/file-analyzer.md
@@ -213,7 +213,9 @@ Using the script's structural data and file categories, create edges:
| `implements` | A class implements an interface in the project | `0.9` | `forward` |
| `exports` | File exports a function or class node you created (only for exported items — use IN ADDITION to `contains`, not instead of it) | `0.8` | `forward` |
| `depends_on` | File has runtime dependency on another project file (broader than imports -- includes dynamic requires, lazy loads) | `0.6` | `forward` |
-| `tested_by` | Source file is tested by a test file (infer from test file imports and naming conventions) | `0.5` | `forward` |
+| `tested_by` | Production file is exercised by a test file. Emit when you see the test importing/using the production file. Use direction `production → test` if you can; the merge script will flip inverted edges and dedupe. | `0.5` | `forward` |
+
+**Note on `tested_by`:** It's fine to emit even if you're unsure of the direction (you typically see the relationship while analyzing the *test* file, where the import points back at production). The merge script (`merge-batch-graphs.py`) canonicalizes direction to `production → test` and drops semantically broken edges (test↔test, prod↔prod, orphan endpoint). Path-convention pairing supplements anything you miss.
#### Edges for non-code files:
diff --git a/understand-anything-plugin/packages/dashboard/src/components/CustomNode.tsx b/understand-anything-plugin/packages/dashboard/src/components/CustomNode.tsx
index 01c912f8..39fe3197 100644
--- a/understand-anything-plugin/packages/dashboard/src/components/CustomNode.tsx
+++ b/understand-anything-plugin/packages/dashboard/src/components/CustomNode.tsx
@@ -151,9 +151,19 @@ function CustomNodeComponent({
{data.nodeType}
-
- {data.complexity}
-
+
+
+ {data.complexity}
+
+ {data.tags?.includes("tested") && (
+
+ )}
+
diff --git a/understand-anything-plugin/packages/dashboard/src/components/GraphView.tsx b/understand-anything-plugin/packages/dashboard/src/components/GraphView.tsx
index 891d9971..4ba8d1be 100644
--- a/understand-anything-plugin/packages/dashboard/src/components/GraphView.tsx
+++ b/understand-anything-plugin/packages/dashboard/src/components/GraphView.tsx
@@ -533,6 +533,7 @@ function useLayerDetailTopology(): LayerDetailTopology & {
nodeType: node.type,
summary: node.summary,
complexity: node.complexity,
+ tags: node.tags,
isHighlighted: false,
searchScore: undefined,
isSelected: false,
@@ -908,6 +909,7 @@ function buildCustomFlowNode(
nodeType: node.type,
summary: node.summary,
complexity: node.complexity,
+ tags: node.tags,
isHighlighted: false,
searchScore: undefined,
isSelected: false,
diff --git a/understand-anything-plugin/skills/understand/SKILL.md b/understand-anything-plugin/skills/understand/SKILL.md
index 5f6ab214..07adeb68 100644
--- a/understand-anything-plugin/skills/understand/SKILL.md
+++ b/understand-anything-plugin/skills/understand/SKILL.md
@@ -278,6 +278,8 @@ This script reads all `batch-*.json` files from `$PROJECT_ROOT/.understand-anyth
- Drops dangling edges referencing missing nodes
- Logs all corrections and dropped items to stderr
+The merge script also runs a `tested_by` linker that canonicalizes test-coverage edges in two passes. **Pass 1** walks LLM-emitted `tested_by` edges and flips inverted ones in place (the LLM systematically emits `test → production` because it sees the import only when analyzing the test file); semantically broken edges (test↔test, prod↔prod, orphan endpoints) are dropped. **Pass 2** supplements with path-convention pairings (`X.ts` ↔ `X.test.ts`, JS/TS `__tests__/` and `
/test/` walk-out, Python in-package `tests/`, Go `_test.go` sibling, Maven/Gradle `src/test/...` ↔ `src/main/...`, .NET `/tests/` ↔ `/src/...` and `.Tests/` ↔ `/`). Production nodes that end up sourcing any `tested_by` edge get a `"tested"` tag. All resulting edges run `production → test`.
+
Output: `$PROJECT_ROOT/.understand-anything/intermediate/assembled-graph.json`
Include the script's warnings in `$PHASE_WARNINGS` for the reviewer.
diff --git a/understand-anything-plugin/skills/understand/merge-batch-graphs.py b/understand-anything-plugin/skills/understand/merge-batch-graphs.py
index 54a1b5c8..8963cbfd 100644
--- a/understand-anything-plugin/skills/understand/merge-batch-graphs.py
+++ b/understand-anything-plugin/skills/understand/merge-batch-graphs.py
@@ -19,6 +19,7 @@
"""
import json
+import os
import re
import sys
from collections import Counter
@@ -69,6 +70,34 @@
VALID_COMPLEXITY = {"simple", "moderate", "complex"}
+# ── tested_by linker configuration ────────────────────────────────────────
+
+# JS/TS family: a `.test.ts` file may be testing a `.ts`, `.tsx`, `.js`, etc.
+# We try each candidate extension in priority order.
+_JS_TS_EXTS: tuple[str, ...] = (".ts", ".tsx", ".js", ".jsx", ".mjs", ".cjs", ".vue")
+_JS_TS_TEST_EXTS: frozenset[str] = frozenset(_JS_TS_EXTS)
+
+# Mirrored production roots — when a test sits under `tests/`, it might be
+# mirroring `src/`, `app/`, `lib/`, or the project root.
+_MIRROR_PRODUCTION_ROOTS: tuple[str, ...] = ("src", "app", "lib", "")
+
+# Per-extension test-name patterns: ext → (prefix_patterns, suffix_patterns).
+# A basename qualifies as a test if its stem starts with any prefix or ends
+# with any suffix listed for its extension. JS/TS family is handled separately
+# because its `.test`/`.spec` infix sits on the *stem* of a double-extension
+# basename (e.g. `foo.test.ts` has ext `.ts`, stem `foo.test`).
+_TEST_NAME_PATTERNS: dict[str, tuple[tuple[str, ...], tuple[str, ...]]] = {
+ ".go": ((), ("_test",)),
+ ".py": (("test_",), ("_test",)),
+ ".java": ((), ("Test", "Tests", "IT")),
+ ".kt": ((), ("Test", "Tests")),
+ ".cs": ((), ("Test", "Tests")),
+ ".c": (("test_",), ("_test",)),
+ ".cpp": (("test_",), ("_test",)),
+ ".cc": (("test_",), ("_test",)),
+}
+
+
def _num(v: Any) -> float:
"""Coerce a value to float for safe comparison (handles string weights)."""
try:
@@ -190,6 +219,476 @@ def normalize_complexity(value: Any) -> tuple[str, str]:
return "moderate", "unknown"
+# ── Deterministic tested_by linker ────────────────────────────────────────
+#
+# Two-pass linker. Both passes produce canonical `production → test` edges.
+#
+# Pass 1 — preserve LLM semantics, fix direction.
+# The LLM sees the relationship only when analyzing a *test* file
+# (production files don't import their tests), so its emitted direction
+# is systematically wrong: source = the file it was analyzing = a test.
+# We do NOT strip these edges — the *pairing* is real evidence (the LLM
+# saw an import / using / same-package call). We just flip direction
+# when source is test + target is production. Edges that are
+# semantically broken (test↔test, production↔production, orphan endpoints)
+# are dropped.
+#
+# Pass 2 — supplement with path-convention pairings.
+# For test files the LLM didn't link to anything, fall back to filename
+# conventions (sibling `_test.go`, JS/TS `__tests__/`, Maven `src/test/`,
+# etc.) to find a production counterpart. Pairs already covered by
+# Pass 1 are skipped.
+#
+# Why this beats strip-and-rederive: real projects often violate the
+# linker's naming conventions (one Go `_test.go` covering several `.go`
+# files in the same package, .NET `/tests/X.cs` against
+# `/src/Y/X.cs`). Stripping LLM edges drops that real-world coverage
+# signal entirely. Swapping preserves it.
+
+def _path_segments(path: str) -> list[str]:
+ """Split a relative POSIX-style path into segments (ignoring empties)."""
+ return [seg for seg in path.split("/") if seg]
+
+
+def _basename(path: str) -> str:
+ return path.rsplit("/", 1)[-1] if "/" in path else path
+
+
+def is_test_path(path: str) -> bool:
+ """Return True if `path` looks like a test file by basename convention.
+
+ Files inside `tests/`, `__tests__/`, `test/`, or `spec/` directories that
+ do NOT carry a recognized test extension are treated as helpers/fixtures
+ and classified as non-test (so `__tests__/helpers.ts` is not a test).
+ """
+ stem, ext = os.path.splitext(_basename(path))
+
+ # JS/TS family: the test marker is an infix on the stem (foo.test.ts has
+ # stem "foo.test", ext ".ts"), not a prefix/suffix on the stem itself.
+ if ext in _JS_TS_TEST_EXTS:
+ return stem.endswith(".test") or stem.endswith(".spec")
+
+ patterns = _TEST_NAME_PATTERNS.get(ext)
+ if patterns is None:
+ return False
+ prefixes, suffixes = patterns
+ return any(stem.startswith(p) for p in prefixes) or any(
+ stem.endswith(s) for s in suffixes
+ )
+
+
+def _strip_test_infix(stem: str) -> str | None:
+ """For a JS/TS-family stem like `foo.test` or `foo.spec`, strip the
+ trailing `.test` / `.spec`. Returns None if no infix is present."""
+ for infix in (".test", ".spec"):
+ if stem.endswith(infix):
+ return stem[: -len(infix)]
+ return None
+
+
+def _join(dir_path: str, name: str) -> str:
+ """Join a (possibly empty) directory path to a basename with a single
+ slash, dropping the slash entirely when there is no directory."""
+ return f"{dir_path}/{name}" if dir_path else name
+
+
+def _add_unique(out: list[str], path: str) -> None:
+ """Append `path` to `out` unless it is empty or already present."""
+ if path and path not in out:
+ out.append(path)
+
+
+def _js_ts_sibling_candidates(dir_path: str, base_stem: str) -> list[str]:
+ """Build sibling candidates for a JS/TS family base stem.
+
+ `dir_path` is the parent dir (no trailing slash, may be empty).
+ `base_stem` is the stem with the test infix already stripped.
+ """
+ return [_join(dir_path, f"{base_stem}{e}") for e in _JS_TS_EXTS]
+
+
+def production_candidates(test_path: str) -> list[str]:
+ """For a test file path, return ordered candidate production paths.
+
+ The returned list is in priority order (sibling first, then `__tests__`
+ walk-out, then mirrored-tree variants). Duplicates are removed while
+ preserving order. Caller should pick the first candidate that resolves
+ to a known production node.
+ """
+ stem, ext = os.path.splitext(_basename(test_path))
+ segs = _path_segments(test_path)
+ dir_segs = segs[:-1]
+ dir_path = "/".join(dir_segs)
+
+ candidates: list[str] = []
+
+ # ── JS/TS family ──────────────────────────────────────────────────
+ if ext in _JS_TS_TEST_EXTS:
+ base_stem = _strip_test_infix(stem)
+ if base_stem is not None:
+ # 1. Sibling de-infix: prefer the same extension as the test, then
+ # the rest of the family.
+ _add_unique(candidates, _join(dir_path, f"{base_stem}{ext}"))
+ for c in _js_ts_sibling_candidates(dir_path, base_stem):
+ _add_unique(candidates, c)
+
+ # 2. Walk out of test-segregating subdir — drop the trailing
+ # __tests__/test/spec/tests segment. Some JS/TS projects use
+ # `/test/foo.spec.ts` or `/spec/foo.spec.ts` instead of
+ # the more idiomatic `__tests__/`; treat them the same.
+ if dir_segs and dir_segs[-1] in ("__tests__", "test", "spec", "tests"):
+ parent_dir = "/".join(dir_segs[:-1])
+ _add_unique(candidates, _join(parent_dir, f"{base_stem}{ext}"))
+ for c in _js_ts_sibling_candidates(parent_dir, base_stem):
+ _add_unique(candidates, c)
+
+ # 3. Mirrored tree: tests/foo/X.test.ts → src/foo/X.ts (and
+ # variants for app/lib/).
+ if dir_segs and dir_segs[0] in ("tests", "test", "__tests__"):
+ tail_path = "/".join(dir_segs[1:])
+ for root in _MIRROR_PRODUCTION_ROOTS:
+ new_dir = "/".join(p for p in (root, tail_path) if p)
+ _add_unique(candidates, _join(new_dir, f"{base_stem}{ext}"))
+ for c in _js_ts_sibling_candidates(new_dir, base_stem):
+ _add_unique(candidates, c)
+
+ # ── Go ────────────────────────────────────────────────────────────
+ elif ext == ".go" and stem.endswith("_test"):
+ base_stem = stem[: -len("_test")]
+ _add_unique(candidates, _join(dir_path, f"{base_stem}.go"))
+
+ # ── Python ────────────────────────────────────────────────────────
+ elif ext == ".py" and (stem.startswith("test_") or stem.endswith("_test")):
+ if stem.startswith("test_"):
+ base_stem = stem[len("test_"):]
+ else:
+ base_stem = stem[: -len("_test")]
+
+ # Sibling
+ _add_unique(candidates, _join(dir_path, f"{base_stem}.py"))
+
+ # Walk out of an in-package tests/ or test/ directory:
+ # `mypkg/tests/test_bar.py` → `mypkg/bar.py`. Common in Django apps
+ # and any project that colocates tests with the package they cover.
+ if dir_segs and dir_segs[-1] in ("tests", "test"):
+ parent_dir = "/".join(dir_segs[:-1])
+ _add_unique(candidates, _join(parent_dir, f"{base_stem}.py"))
+
+ # Mirrored: tests/foo/test_bar.py → src/foo/bar.py (and variants)
+ if dir_segs and dir_segs[0] in ("tests", "test"):
+ tail_path = "/".join(dir_segs[1:])
+ for root in _MIRROR_PRODUCTION_ROOTS:
+ new_dir = "/".join(p for p in (root, tail_path) if p)
+ _add_unique(candidates, _join(new_dir, f"{base_stem}.py"))
+
+ # ── Java ──────────────────────────────────────────────────────────
+ elif ext == ".java":
+ for suffix in ("Tests", "Test", "IT"):
+ if stem.endswith(suffix):
+ base_stem = stem[: -len(suffix)]
+ # Maven/Gradle layout: swap src/test/java/... → src/main/java/...
+ if (
+ len(dir_segs) >= 3
+ and dir_segs[0] == "src"
+ and dir_segs[1] == "test"
+ and dir_segs[2] == "java"
+ ):
+ new_dir = "/".join(["src", "main", "java"] + list(dir_segs[3:]))
+ _add_unique(candidates, f"{new_dir}/{base_stem}.java")
+ # Sibling fallback
+ _add_unique(candidates, _join(dir_path, f"{base_stem}.java"))
+ break
+
+ # ── Kotlin ────────────────────────────────────────────────────────
+ elif ext == ".kt":
+ for suffix in ("Tests", "Test"):
+ if stem.endswith(suffix):
+ base_stem = stem[: -len(suffix)]
+ if (
+ len(dir_segs) >= 3
+ and dir_segs[0] == "src"
+ and dir_segs[1] == "test"
+ and dir_segs[2] == "kotlin"
+ ):
+ new_dir = "/".join(["src", "main", "kotlin"] + list(dir_segs[3:]))
+ _add_unique(candidates, f"{new_dir}/{base_stem}.kt")
+ _add_unique(candidates, _join(dir_path, f"{base_stem}.kt"))
+ break
+
+ # ── C# ────────────────────────────────────────────────────────────
+ elif ext == ".cs":
+ for suffix in ("Tests", "Test"):
+ if stem.endswith(suffix):
+ base_stem = stem[: -len(suffix)]
+ # Sibling fallback (e.g. `Foo.Tests/BarTests.cs` ↔ same dir
+ # is rare but cheap to try).
+ _add_unique(candidates, _join(dir_path, f"{base_stem}.cs"))
+
+ # Walk out of an in-service `tests/` directory and search
+ # the sibling `src/` subtree. Handles layouts like
+ # `src//tests/BarTests.cs` ↔ `src//src/.../Bar.cs`
+ # (microservices-demo cartservice) and bare
+ # `/tests/BarTests.cs` ↔ `/src/Bar.cs`.
+ tests_idx = None
+ for i in range(len(dir_segs) - 1, -1, -1):
+ if dir_segs[i].lower() in ("tests", "test"):
+ tests_idx = i
+ break
+ if tests_idx is not None:
+ parent_segs = dir_segs[:tests_idx]
+ tail_segs = dir_segs[tests_idx + 1 :]
+ parent_dir = "/".join(parent_segs)
+ # `/.cs` (drop `tests/` entirely).
+ _add_unique(
+ candidates,
+ _join(parent_dir, f"{base_stem}.cs"),
+ )
+ # `/src//.cs` (mirror through src/).
+ src_dir = "/".join([*parent_segs, "src", *tail_segs])
+ _add_unique(candidates, _join(src_dir, f"{base_stem}.cs"))
+
+ # `.NET`-style sibling-project mirror: `My.App.Tests/...` ↔
+ # `My.App/...`. The test project's top dir typically ends in
+ # `.Tests`. Strip it and try the same tail under the sibling.
+ if dir_segs:
+ top = dir_segs[0]
+ if top.endswith(".Tests") or top.endswith(".Test"):
+ sibling = top[: -len(".Tests")] if top.endswith(".Tests") else top[: -len(".Test")]
+ if sibling:
+ mirror_dir = "/".join([sibling, *dir_segs[1:]])
+ _add_unique(
+ candidates,
+ _join(mirror_dir, f"{base_stem}.cs"),
+ )
+ break
+
+ # ── C/C++ ─────────────────────────────────────────────────────────
+ elif ext in {".c", ".cpp", ".cc"}:
+ if stem.startswith("test_"):
+ base_stem = stem[len("test_"):]
+ elif stem.endswith("_test"):
+ base_stem = stem[: -len("_test")]
+ else:
+ base_stem = None
+ if base_stem is not None:
+ _add_unique(candidates, _join(dir_path, f"{base_stem}{ext}"))
+
+ return candidates
+
+
+def _file_node_path(node: dict[str, Any]) -> str | None:
+ """Return the relative project path for a `file:`-prefixed node, else None."""
+ nid = node.get("id", "")
+ if not isinstance(nid, str) or not nid.startswith("file:"):
+ return None
+ fp = node.get("filePath")
+ if isinstance(fp, str) and fp:
+ return fp
+ return nid[len("file:"):]
+
+
+def _swap_tested_by_in_place(
+ edge: dict[str, Any], original_src: str, original_tgt: str
+) -> None:
+ """Flip an inverted `tested_by` edge so source becomes production and
+ target becomes the test file. Mutates `edge` in place; appends a
+ `[direction corrected]` audit marker to `description`.
+ """
+ edge["source"] = original_tgt
+ edge["target"] = original_src
+ edge["direction"] = "forward"
+ prev = edge.get("description")
+ edge["description"] = (
+ "Direction corrected (was test → production)"
+ if not prev
+ else f"{prev} [direction corrected]"
+ )
+
+
+def _ensure_tested_tag(node: dict[str, Any]) -> bool:
+ """Append "tested" to `node["tags"]`, coercing malformed `tags` to a
+ fresh list. Returns True if the tag was newly added.
+
+ `tags` from raw LLM batch JSON may be missing, None, a string, or
+ another non-list value — the TypeScript autoFixGraph normalizer that
+ handles this runs downstream of this script, so we defend here.
+ """
+ tags = node.get("tags")
+ if not isinstance(tags, list):
+ tags = []
+ node["tags"] = tags
+ if "tested" in tags:
+ return False
+ tags.append("tested")
+ return True
+
+
+def link_tests(
+ nodes_by_id: dict[str, dict[str, Any]],
+ edges: list[dict[str, Any]],
+) -> tuple[int, int, int, int]:
+ """Canonicalize `tested_by` edges and link unmatched test files.
+
+ Two passes (see module-level "Deterministic tested_by linker" comment
+ for the rationale):
+
+ 1. Walk every existing `tested_by` edge. Keep canonical
+ (production → test) edges as-is. Flip inverted (test → production)
+ edges so the swap preserves the LLM's pairing evidence with the
+ right direction. Drop edges that don't classify cleanly as
+ file ↔ file or where one endpoint is missing — they have no
+ recoverable meaning.
+ 2. For every test file not yet paired by Pass 1, walk path-convention
+ candidates and emit a fresh `production → test` edge for the first
+ match.
+
+ Tagging happens once per production node that ends up on the source
+ side of any `tested_by` edge (canonical, swapped, or supplemented).
+
+ Mutates `nodes_by_id` (adds "tested" tag) and `edges` (rewrites
+ in place: drops semantically broken edges, swaps inverted ones, appends
+ supplements).
+
+ Returns (added, dropped, tagged, swapped):
+ added: path-convention supplemental edges appended in Pass 2
+ dropped: pre-existing `tested_by` edges removed (unsalvageable)
+ tagged: production nodes newly tagged "tested"
+ swapped: pre-existing `tested_by` edges flipped (test → production
+ became production → test)
+ """
+ # ── Index file nodes by relative path; classify each as test/production.
+ # `is_prod` here means "is a known file node AND is not a test by
+ # path convention" — used both to validate edge endpoints and to drive
+ # path-convention candidate matching.
+ file_paths_to_nodes: dict[str, dict[str, Any]] = {}
+ node_id_to_classification: dict[str, str] = {} # id → "test" | "prod"
+ test_nodes: list[tuple[str, dict[str, Any]]] = []
+ for node in nodes_by_id.values():
+ path = _file_node_path(node)
+ if path is None:
+ continue
+ file_paths_to_nodes[path] = node
+ if is_test_path(path):
+ node_id_to_classification[node["id"]] = "test"
+ test_nodes.append((path, node))
+ else:
+ node_id_to_classification[node["id"]] = "prod"
+
+ # ── Pass 1: walk existing tested_by edges, canonicalize or drop.
+ # `covered` tracks (production_id, test_id) pairs that have a kept edge
+ # after this pass — used both to deduplicate within Pass 1 and to
+ # suppress duplicate supplements in Pass 2.
+ # `pair_to_idx` maps each kept pair to its slot in the compacted edges
+ # list, so a duplicate that arrives later with a higher weight can
+ # replace the earlier slot in place (mirrors Step 6's
+ # `weight > existing.weight` rule — without this, a 0.3-weight edge
+ # from batch 1 would silently outrank a 0.9-weight edge from batch 2
+ # because Step 6 only ever sees one of them).
+ # `swapped_pairs` records which surviving pairs came from a flipped
+ # edge, so the `swapped` counter reflects the FINAL output and
+ # doesn't double-count work done on edges that were later replaced.
+ covered: set[tuple[str, str]] = set()
+ pair_to_idx: dict[tuple[str, str], int] = {}
+ swapped_pairs: set[tuple[str, str]] = set()
+ dropped = 0
+ write_idx = 0
+ for edge in edges:
+ if edge.get("type") != "tested_by":
+ edges[write_idx] = edge
+ write_idx += 1
+ continue
+
+ src = edge.get("source", "")
+ tgt = edge.get("target", "")
+ src_class = node_id_to_classification.get(src)
+ tgt_class = node_id_to_classification.get(tgt)
+
+ # Both endpoints must be known file nodes; one test, one production.
+ # Anything else (orphan, test↔test, prod↔prod, non-file endpoint)
+ # has no recoverable meaning — drop it.
+ if (src_class, tgt_class) == ("prod", "test"):
+ pair = (src, tgt)
+ needs_swap = False
+ elif (src_class, tgt_class) == ("test", "prod"):
+ pair = (tgt, src)
+ needs_swap = True
+ else:
+ dropped += 1
+ continue
+
+ if pair in covered:
+ # Duplicate pair: keep the heavier-weight edge (mirrors the
+ # weight-aware dedup in Step 6, which can't help here because
+ # only one of the duplicates would reach it).
+ existing_idx = pair_to_idx[pair]
+ existing = edges[existing_idx]
+ if _num(edge.get("weight", 0)) > _num(existing.get("weight", 0)):
+ # Heavier — replace existing slot. Apply the swap (or not)
+ # only on the survivor, so we never spend cycles canonicalizing
+ # an edge we're about to drop.
+ if needs_swap:
+ _swap_tested_by_in_place(edge, src, tgt)
+ swapped_pairs.add(pair)
+ else:
+ # Replacement is canonical — if the previous winner came
+ # from a swap, the surviving slot is no longer a swap.
+ swapped_pairs.discard(pair)
+ edges[existing_idx] = edge
+ # else: existing is heavier or equal — keep it, drop the new edge.
+ dropped += 1
+ continue
+
+ if needs_swap:
+ _swap_tested_by_in_place(edge, src, tgt)
+ swapped_pairs.add(pair)
+ covered.add(pair)
+ pair_to_idx[pair] = write_idx
+ edges[write_idx] = edge
+ write_idx += 1
+ del edges[write_idx:]
+ swapped = len(swapped_pairs)
+
+ # ── Pass 2: path-convention supplement for tests not yet paired.
+ paired_test_ids = {test_id for (_prod_id, test_id) in covered}
+ added = 0
+ for test_path, test_node in test_nodes:
+ if test_node["id"] in paired_test_ids:
+ continue
+ for cand_path in production_candidates(test_path):
+ prod_node = file_paths_to_nodes.get(cand_path)
+ if prod_node is None:
+ continue
+ if is_test_path(cand_path):
+ # Don't link a test to another test even if naming aligns.
+ continue
+ pair = (prod_node["id"], test_node["id"])
+ if pair in covered:
+ continue
+ edges.append({
+ "source": prod_node["id"],
+ "target": test_node["id"],
+ "type": "tested_by",
+ "direction": "forward",
+ "weight": 0.5,
+ "description": "Path-based pairing (deterministic)",
+ })
+ covered.add(pair)
+ added += 1
+ break
+
+ # ── Tag every production node that ended up sourcing a tested_by edge
+ # (covers Pass 1 canonical + swapped + Pass 2 supplements in one place).
+ tagged = 0
+ for prod_id, _test_id in covered:
+ prod_node = nodes_by_id.get(prod_id)
+ if prod_node is None:
+ continue
+ if _ensure_tested_tag(prod_node):
+ tagged += 1
+
+ return added, dropped, tagged, swapped
+
+
# ── Main merge + normalize ────────────────────────────────────────────────
def merge_and_normalize(batches: list[dict[str, Any]]) -> tuple[dict[str, Any], list[str]]:
@@ -273,6 +772,12 @@ def merge_and_normalize(batches: list[dict[str, Any]]) -> tuple[dict[str, Any],
duplicate_count += 1
nodes_by_id[nid] = node
+ # ── Step 5b: Deterministic tested_by linker ──────────────────────
+ # See module-level "Deterministic tested_by linker" section above.
+ tested_by_added, tested_by_dropped, tested_by_tagged, tested_by_swapped = link_tests(
+ nodes_by_id, all_edges
+ )
+
# ── Step 6: Deduplicate edges, drop dangling ─────────────────────
node_ids = set(nodes_by_id.keys())
edges_by_key: dict[tuple[str, str, str], dict] = {}
@@ -311,12 +816,32 @@ def merge_and_normalize(batches: list[dict[str, Any]]) -> tuple[dict[str, Any],
fixed_lines.append(f" {edges_rewritten:>4} × edge references rewritten after ID normalization")
if duplicate_count:
fixed_lines.append(f" {duplicate_count:>4} × duplicate node IDs removed (kept last)")
+ if tested_by_swapped:
+ fixed_lines.append(f" {tested_by_swapped:>4} × tested_by edges flipped (test → production became production → test)")
+ if tested_by_dropped:
+ fixed_lines.append(f" {tested_by_dropped:>4} × tested_by edges dropped (orphan endpoint or test↔test / prod↔prod pair)")
if fixed_lines:
report.append("")
- report.append(f"Fixed ({sum(id_fix_patterns.values()) + sum(complexity_fix_patterns.values()) + edges_rewritten + duplicate_count} corrections):")
+ total_fixes = (
+ sum(id_fix_patterns.values())
+ + sum(complexity_fix_patterns.values())
+ + edges_rewritten
+ + duplicate_count
+ + tested_by_swapped
+ + tested_by_dropped
+ )
+ report.append(f"Fixed ({total_fixes} corrections):")
report.extend(fixed_lines)
+ # Tested-by linker section — separate from Fixed since these are net-new
+ # additions, not corrections.
+ if tested_by_added or tested_by_tagged:
+ report.append("")
+ report.append("Tested-by linker:")
+ report.append(f" {tested_by_added:>4} × tested_by edges produced (path-convention supplement, production → test)")
+ report.append(f" {tested_by_tagged:>4} × production nodes tagged \"tested\"")
+
# Could not fix section — unknown patterns (grouped) + individual details
unfixable_total = (
len(unfixable)
diff --git a/understand-anything-plugin/skills/understand/test_merge_batch_graphs.py b/understand-anything-plugin/skills/understand/test_merge_batch_graphs.py
new file mode 100644
index 00000000..7c9c63f0
--- /dev/null
+++ b/understand-anything-plugin/skills/understand/test_merge_batch_graphs.py
@@ -0,0 +1,874 @@
+#!/usr/bin/env python3
+"""
+test_merge_batch_graphs.py — Tests for the deterministic tested_by linker.
+
+Run from this directory:
+ python -m unittest test_merge_batch_graphs.py -v
+"""
+
+from __future__ import annotations
+
+import importlib.util
+import sys
+import unittest
+from pathlib import Path
+from typing import Any
+
+
+# ── Module loader ─────────────────────────────────────────────────────────
+# `merge-batch-graphs.py` has a hyphen in its name, so we cannot `import` it
+# directly. Load it via importlib so we can call its module-level helpers.
+
+_HERE = Path(__file__).resolve().parent
+_MODULE_PATH = _HERE / "merge-batch-graphs.py"
+
+
+def _load_module() -> Any:
+ spec = importlib.util.spec_from_file_location("merge_batch_graphs", _MODULE_PATH)
+ if spec is None or spec.loader is None:
+ raise RuntimeError(f"Could not load module from {_MODULE_PATH}")
+ module = importlib.util.module_from_spec(spec)
+ sys.modules["merge_batch_graphs"] = module
+ spec.loader.exec_module(module)
+ return module
+
+
+mbg = _load_module()
+
+
+# ── Helpers ───────────────────────────────────────────────────────────────
+
+def _file_node(path: str, **extra: Any) -> dict[str, Any]:
+ """Build a minimal file node with the given relative path."""
+ node: dict[str, Any] = {
+ "id": f"file:{path}",
+ "type": "file",
+ "name": path.rsplit("/", 1)[-1],
+ "filePath": path,
+ "summary": "",
+ "tags": [],
+ "complexity": "simple",
+ }
+ node.update(extra)
+ return node
+
+
+# ── is_test_path ──────────────────────────────────────────────────────────
+
+class IsTestPathTests(unittest.TestCase):
+ """Path classification: production vs. test."""
+
+ def test_js_ts_sibling_test_extensions(self) -> None:
+ for path in [
+ "src/foo.test.ts",
+ "src/foo.test.tsx",
+ "src/foo.test.js",
+ "src/foo.test.jsx",
+ "src/foo.test.mjs",
+ "src/foo.test.cjs",
+ "src/Component.test.vue",
+ "src/foo.spec.ts",
+ "src/foo.spec.tsx",
+ "src/foo.spec.js",
+ "src/Component.spec.vue",
+ ]:
+ with self.subTest(path=path):
+ self.assertTrue(mbg.is_test_path(path), f"{path} should be a test")
+
+ def test_underscore_test_dir_with_test_extension(self) -> None:
+ self.assertTrue(mbg.is_test_path("src/__tests__/foo.test.js"))
+ self.assertTrue(mbg.is_test_path("src/__tests__/foo.test.ts"))
+
+ def test_tests_directory_with_test_extension(self) -> None:
+ self.assertTrue(mbg.is_test_path("tests/foo/X.test.ts"))
+ self.assertTrue(mbg.is_test_path("test/foo/X.test.ts"))
+ self.assertTrue(mbg.is_test_path("spec/foo/X.spec.ts"))
+
+ def test_go_test_files(self) -> None:
+ self.assertTrue(mbg.is_test_path("internal/bar_test.go"))
+ self.assertTrue(mbg.is_test_path("bar_test.go"))
+
+ def test_python_test_files(self) -> None:
+ self.assertTrue(mbg.is_test_path("tests/test_bar.py"))
+ self.assertTrue(mbg.is_test_path("bar_test.py"))
+ self.assertTrue(mbg.is_test_path("test_bar.py"))
+
+ def test_java_test_files(self) -> None:
+ self.assertTrue(mbg.is_test_path("src/test/java/com/foo/BarTest.java"))
+ self.assertTrue(mbg.is_test_path("src/test/java/com/foo/BarTests.java"))
+ self.assertTrue(mbg.is_test_path("src/test/java/com/foo/BarIT.java"))
+
+ def test_kotlin_test_files(self) -> None:
+ self.assertTrue(mbg.is_test_path("src/test/kotlin/com/foo/BarTest.kt"))
+ self.assertTrue(mbg.is_test_path("src/test/kotlin/com/foo/BarTests.kt"))
+
+ def test_csharp_test_files(self) -> None:
+ self.assertTrue(mbg.is_test_path("Foo.Tests/BarTests.cs"))
+ self.assertTrue(mbg.is_test_path("Foo.Tests/BarTest.cs"))
+
+ def test_c_cpp_test_files(self) -> None:
+ self.assertTrue(mbg.is_test_path("test/bar_test.c"))
+ self.assertTrue(mbg.is_test_path("test/test_bar.c"))
+ self.assertTrue(mbg.is_test_path("test/bar_test.cpp"))
+ self.assertTrue(mbg.is_test_path("test/bar_test.cc"))
+ self.assertTrue(mbg.is_test_path("test/test_bar.cpp"))
+
+ def test_production_files_rejected(self) -> None:
+ for path in [
+ "src/foo.ts",
+ "src/foo.tsx",
+ "internal/bar.go",
+ "src/index.tsx",
+ "README.md",
+ "docs/guide.md",
+ "main.py",
+ "src/foo/bar.js",
+ "Foo.cs",
+ "Bar.kt",
+ "Bar.java",
+ ]:
+ with self.subTest(path=path):
+ self.assertFalse(mbg.is_test_path(path), f"{path} should be production")
+
+ def test_helper_in_tests_dir_without_test_extension_is_not_test(self) -> None:
+ # Files that live inside a __tests__ directory but don't carry a test
+ # extension are treated as helpers, not tests. We only count code files
+ # whose basename matches a test pattern. Assets/non-code files in
+ # tests/ are not flagged.
+ self.assertFalse(mbg.is_test_path("src/__tests__/helpers.ts"))
+ self.assertFalse(mbg.is_test_path("tests/fixtures/data.json"))
+
+
+# ── production_candidates ─────────────────────────────────────────────────
+
+class ProductionCandidatesTests(unittest.TestCase):
+ """For each test path, what production paths should we try?"""
+
+ def test_js_ts_sibling(self) -> None:
+ cands = mbg.production_candidates("src/foo/X.test.ts")
+ # Sibling de-infix should be in the candidate list, with .ts as the
+ # most natural target. Several extensions are tried because a .test.ts
+ # file might test a .tsx file.
+ self.assertIn("src/foo/X.ts", cands)
+ self.assertIn("src/foo/X.tsx", cands)
+
+ def test_js_ts_spec_sibling(self) -> None:
+ cands = mbg.production_candidates("src/foo/X.spec.tsx")
+ self.assertIn("src/foo/X.tsx", cands)
+ self.assertIn("src/foo/X.ts", cands)
+
+ def test_underscore_tests_dir(self) -> None:
+ cands = mbg.production_candidates("src/foo/__tests__/X.test.ts")
+ # Walking out of __tests__/ should produce src/foo/X.ts
+ self.assertIn("src/foo/X.ts", cands)
+
+ def test_mirrored_tests_tree(self) -> None:
+ cands = mbg.production_candidates("tests/foo/X.test.ts")
+ # Should try src/foo/X.ts, app/foo/X.ts, lib/foo/X.ts, foo/X.ts
+ self.assertIn("src/foo/X.ts", cands)
+ self.assertIn("foo/X.ts", cands)
+
+ def test_go_sibling(self) -> None:
+ cands = mbg.production_candidates("internal/bar_test.go")
+ self.assertIn("internal/bar.go", cands)
+
+ def test_python_test_prefix(self) -> None:
+ cands = mbg.production_candidates("tests/test_bar.py")
+ self.assertIn("tests/bar.py", cands)
+ # Also try mirrored layout
+ self.assertIn("bar.py", cands)
+ self.assertIn("src/bar.py", cands)
+
+ def test_python_test_suffix(self) -> None:
+ cands = mbg.production_candidates("foo/bar_test.py")
+ self.assertIn("foo/bar.py", cands)
+
+ def test_java_maven_layout(self) -> None:
+ cands = mbg.production_candidates("src/test/java/com/foo/BarTest.java")
+ self.assertIn("src/main/java/com/foo/Bar.java", cands)
+
+ def test_java_tests_suffix(self) -> None:
+ cands = mbg.production_candidates("src/test/java/com/foo/BarTests.java")
+ self.assertIn("src/main/java/com/foo/Bar.java", cands)
+
+ def test_java_it_suffix(self) -> None:
+ cands = mbg.production_candidates("src/test/java/com/foo/BarIT.java")
+ self.assertIn("src/main/java/com/foo/Bar.java", cands)
+
+ def test_kotlin_maven_layout(self) -> None:
+ cands = mbg.production_candidates("src/test/kotlin/com/foo/BarTest.kt")
+ self.assertIn("src/main/kotlin/com/foo/Bar.kt", cands)
+
+ def test_js_ts_test_subdir_walkout(self) -> None:
+ # Some JS/TS projects use `/test/` or `/spec/` instead of
+ # the more idiomatic `__tests__/`. Walk out of either.
+ cands_test = mbg.production_candidates("src/foo/test/X.test.ts")
+ self.assertIn("src/foo/X.ts", cands_test)
+ cands_spec = mbg.production_candidates("src/foo/spec/X.spec.ts")
+ self.assertIn("src/foo/X.ts", cands_spec)
+
+ def test_python_in_package_tests_walkout(self) -> None:
+ # `mypkg/tests/test_bar.py` (Django-app style) should pair with
+ # `mypkg/bar.py` — walk out of the in-package tests/ dir.
+ cands = mbg.production_candidates("mypkg/tests/test_bar.py")
+ self.assertIn("mypkg/bar.py", cands)
+ # Also nested:
+ cands_nested = mbg.production_candidates("a/b/test/test_bar.py")
+ self.assertIn("a/b/bar.py", cands_nested)
+
+ def test_csharp_tests_subdir_mirror_to_src(self) -> None:
+ # Real case from microservices-demo cartservice:
+ # `src/cartservice/tests/CartServiceTests.cs` ↔
+ # `src/cartservice/src/services/CartService.cs`. The candidate list
+ # only knows the basename; the matcher must produce a parent-level
+ # candidate that the linker can verify against the actual file index.
+ cands = mbg.production_candidates(
+ "src/cartservice/tests/CartServiceTests.cs"
+ )
+ # Drop tests/ entirely:
+ self.assertIn("src/cartservice/CartService.cs", cands)
+ # Mirror through `src/`:
+ self.assertIn("src/cartservice/src/CartService.cs", cands)
+ # Sibling fallback retained:
+ self.assertIn("src/cartservice/tests/CartService.cs", cands)
+
+ def test_csharp_dotnet_sibling_project_mirror(self) -> None:
+ # `.NET` convention: `MyApp.Tests/Foo/BarTests.cs` ↔
+ # `MyApp/Foo/Bar.cs`. Strip the `.Tests` suffix from the top dir
+ # and try the same tail under the sibling project.
+ cands = mbg.production_candidates("MyApp.Tests/Foo/BarTests.cs")
+ self.assertIn("MyApp/Foo/Bar.cs", cands)
+ # Also `.Test` (singular) is sometimes used.
+ cands_singular = mbg.production_candidates("MyApp.Test/BarTest.cs")
+ self.assertIn("MyApp/Bar.cs", cands_singular)
+
+ def test_priority_underscore_tests_sibling_before_walkup(self) -> None:
+ # When a test sits in `src/__tests__/`, the sibling-de-infix path
+ # (same directory) ranks before the walk-out path (parent directory).
+ # This is load-bearing: if a project happens to have both
+ # `src/__tests__/X.ts` and `src/X.ts`, we should pair with the
+ # nearer one.
+ cands = mbg.production_candidates("src/__tests__/X.test.ts")
+ self.assertEqual(cands[0], "src/__tests__/X.ts")
+ self.assertIn("src/X.ts", cands)
+ self.assertLess(cands.index("src/__tests__/X.ts"), cands.index("src/X.ts"))
+
+ def test_priority_mirrored_tree_sibling_before_mirror(self) -> None:
+ # `tests/foo/X.test.ts` sibling path is `tests/foo/X.ts`, which must
+ # rank above the mirrored `src/foo/X.ts` variant. Same rationale:
+ # closer pairing wins.
+ cands = mbg.production_candidates("tests/foo/X.test.ts")
+ self.assertEqual(cands[0], "tests/foo/X.ts")
+ self.assertIn("src/foo/X.ts", cands)
+ self.assertLess(cands.index("tests/foo/X.ts"), cands.index("src/foo/X.ts"))
+
+
+# ── link_tests (end-to-end) ───────────────────────────────────────────────
+
+class LinkTestsTests(unittest.TestCase):
+ """End-to-end behaviour of the linker against a node/edge set."""
+
+ def test_basic_pairing_emits_forward_edge(self) -> None:
+ nodes_by_id = {
+ "file:src/foo.ts": _file_node("src/foo.ts"),
+ "file:src/foo.test.ts": _file_node("src/foo.test.ts"),
+ }
+ edges: list[dict[str, Any]] = []
+
+ added, dropped, tagged, swapped = mbg.link_tests(nodes_by_id, edges)
+
+ self.assertEqual(added, 1)
+ self.assertEqual(dropped, 0)
+ self.assertEqual(tagged, 1)
+ self.assertEqual(swapped, 0)
+ self.assertEqual(len(edges), 1)
+ edge = edges[0]
+ self.assertEqual(edge["source"], "file:src/foo.ts")
+ self.assertEqual(edge["target"], "file:src/foo.test.ts")
+ self.assertEqual(edge["type"], "tested_by")
+ self.assertEqual(edge["direction"], "forward")
+ self.assertEqual(edge["weight"], 0.5)
+ self.assertIn("tested", nodes_by_id["file:src/foo.ts"]["tags"])
+ # Test node is not tagged with "tested"
+ self.assertNotIn("tested", nodes_by_id["file:src/foo.test.ts"]["tags"])
+
+ def test_no_production_counterpart_no_edge(self) -> None:
+ nodes_by_id = {
+ "file:src/foo.test.ts": _file_node("src/foo.test.ts"),
+ }
+ edges: list[dict[str, Any]] = []
+
+ added, dropped, tagged, swapped = mbg.link_tests(nodes_by_id, edges)
+
+ self.assertEqual(added, 0)
+ self.assertEqual(tagged, 0)
+ self.assertEqual(swapped, 0)
+ self.assertEqual(len(edges), 0)
+
+ def test_inverted_llm_edge_is_swapped_not_stripped(self) -> None:
+ # The LLM systematically emits tested_by edges as test → production
+ # (it sees the import only when analyzing the test file). The pairing
+ # is real evidence; we keep it and flip the direction in place.
+ nodes_by_id = {
+ "file:src/foo.ts": _file_node("src/foo.ts"),
+ "file:src/foo.test.ts": _file_node("src/foo.test.ts"),
+ }
+ edges: list[dict[str, Any]] = [
+ {
+ "source": "file:src/foo.test.ts",
+ "target": "file:src/foo.ts",
+ "type": "tested_by",
+ "direction": "forward",
+ "weight": 0.5,
+ "description": "from LLM",
+ },
+ ]
+
+ added, dropped, tagged, swapped = mbg.link_tests(nodes_by_id, edges)
+
+ # No supplement needed (the LLM edge already covers this pair).
+ self.assertEqual(added, 0)
+ self.assertEqual(swapped, 1)
+ self.assertEqual(dropped, 0)
+ self.assertEqual(tagged, 1)
+
+ tested_by_edges = [e for e in edges if e["type"] == "tested_by"]
+ self.assertEqual(len(tested_by_edges), 1)
+ edge = tested_by_edges[0]
+ self.assertEqual(edge["source"], "file:src/foo.ts")
+ self.assertEqual(edge["target"], "file:src/foo.test.ts")
+ # Provenance recorded so reviewers can audit the swap.
+ self.assertIn("direction corrected", edge["description"].lower())
+
+ def test_canonical_llm_edge_kept_unchanged(self) -> None:
+ # An LLM edge already in canonical direction should pass through
+ # untouched (no swap, no drop), and Pass 2 must not produce a
+ # duplicate.
+ nodes_by_id = {
+ "file:src/foo.ts": _file_node("src/foo.ts"),
+ "file:src/foo.test.ts": _file_node("src/foo.test.ts"),
+ }
+ edges: list[dict[str, Any]] = [
+ {
+ "source": "file:src/foo.ts",
+ "target": "file:src/foo.test.ts",
+ "type": "tested_by",
+ "direction": "forward",
+ "weight": 0.5,
+ "description": "original",
+ },
+ ]
+
+ added, dropped, tagged, swapped = mbg.link_tests(nodes_by_id, edges)
+
+ self.assertEqual((added, dropped, swapped), (0, 0, 0))
+ self.assertEqual(tagged, 1)
+ tested_by_edges = [e for e in edges if e["type"] == "tested_by"]
+ self.assertEqual(len(tested_by_edges), 1)
+ self.assertEqual(tested_by_edges[0]["description"], "original")
+
+ def test_drops_test_to_test_edge(self) -> None:
+ # An LLM edge between two test files has no recoverable meaning.
+ nodes_by_id = {
+ "file:src/foo.test.ts": _file_node("src/foo.test.ts"),
+ "file:src/bar.test.ts": _file_node("src/bar.test.ts"),
+ }
+ edges: list[dict[str, Any]] = [
+ {
+ "source": "file:src/foo.test.ts",
+ "target": "file:src/bar.test.ts",
+ "type": "tested_by",
+ "direction": "forward",
+ "weight": 0.5,
+ },
+ ]
+
+ added, dropped, tagged, swapped = mbg.link_tests(nodes_by_id, edges)
+
+ self.assertEqual(added, 0)
+ self.assertEqual(swapped, 0)
+ self.assertEqual(dropped, 1)
+ self.assertEqual(tagged, 0)
+ tested_by_edges = [e for e in edges if e["type"] == "tested_by"]
+ self.assertEqual(tested_by_edges, [])
+
+ def test_drops_orphan_endpoint_edge(self) -> None:
+ # Endpoint references a node that doesn't exist in nodes_by_id —
+ # nothing to canonicalize against, drop it.
+ nodes_by_id = {
+ "file:src/foo.ts": _file_node("src/foo.ts"),
+ }
+ edges: list[dict[str, Any]] = [
+ {
+ "source": "file:src/foo.ts",
+ "target": "file:src/missing.test.ts",
+ "type": "tested_by",
+ "direction": "forward",
+ "weight": 0.5,
+ },
+ ]
+
+ added, dropped, tagged, swapped = mbg.link_tests(nodes_by_id, edges)
+
+ self.assertEqual((added, dropped, tagged, swapped), (0, 1, 0, 0))
+ self.assertEqual([e for e in edges if e["type"] == "tested_by"], [])
+
+ def test_dup_keeps_higher_weight_canonical(self) -> None:
+ # Two canonical tested_by edges for the same pair, weights 0.3 and
+ # 0.9. The heavier one must be kept — mirroring the weight-aware
+ # dedup at Step 6 (which never sees the discarded duplicate).
+ nodes_by_id = {
+ "file:src/foo.ts": _file_node("src/foo.ts"),
+ "file:src/foo.test.ts": _file_node("src/foo.test.ts"),
+ }
+ edges: list[dict[str, Any]] = [
+ {"source": "file:src/foo.ts", "target": "file:src/foo.test.ts",
+ "type": "tested_by", "direction": "forward", "weight": 0.3},
+ {"source": "file:src/foo.ts", "target": "file:src/foo.test.ts",
+ "type": "tested_by", "direction": "forward", "weight": 0.9},
+ ]
+ added, dropped, tagged, swapped = mbg.link_tests(nodes_by_id, edges)
+ self.assertEqual((added, dropped, swapped), (0, 1, 0))
+ tested_by_edges = [e for e in edges if e["type"] == "tested_by"]
+ self.assertEqual(len(tested_by_edges), 1)
+ self.assertEqual(tested_by_edges[0]["weight"], 0.9)
+
+ def test_dup_lighter_inverted_dropped_no_swap_counted(self) -> None:
+ # Heavier canonical first, lighter inverted second. The lighter
+ # inverted edge is dropped without being swapped — no point
+ # canonicalizing an edge that's about to die in the dedup.
+ nodes_by_id = {
+ "file:src/foo.ts": _file_node("src/foo.ts"),
+ "file:src/foo.test.ts": _file_node("src/foo.test.ts"),
+ }
+ edges: list[dict[str, Any]] = [
+ {"source": "file:src/foo.ts", "target": "file:src/foo.test.ts",
+ "type": "tested_by", "direction": "forward", "weight": 0.9},
+ {"source": "file:src/foo.test.ts", "target": "file:src/foo.ts",
+ "type": "tested_by", "direction": "forward", "weight": 0.3},
+ ]
+ added, dropped, tagged, swapped = mbg.link_tests(nodes_by_id, edges)
+ self.assertEqual((added, dropped, swapped), (0, 1, 0))
+ tested_by_edges = [e for e in edges if e["type"] == "tested_by"]
+ self.assertEqual(len(tested_by_edges), 1)
+ self.assertEqual(tested_by_edges[0]["weight"], 0.9)
+ # Surviving edge is the original canonical — no audit marker.
+ self.assertNotIn(
+ "direction corrected",
+ (tested_by_edges[0].get("description") or "").lower(),
+ )
+
+ def test_dup_replaces_with_heavier_inverted(self) -> None:
+ # Lighter canonical first, heavier inverted second. The inverted
+ # edge gets swapped AND replaces the kept slot, since it's heavier.
+ nodes_by_id = {
+ "file:src/foo.ts": _file_node("src/foo.ts"),
+ "file:src/foo.test.ts": _file_node("src/foo.test.ts"),
+ }
+ edges: list[dict[str, Any]] = [
+ {"source": "file:src/foo.ts", "target": "file:src/foo.test.ts",
+ "type": "tested_by", "direction": "forward", "weight": 0.3},
+ {"source": "file:src/foo.test.ts", "target": "file:src/foo.ts",
+ "type": "tested_by", "direction": "forward", "weight": 0.9},
+ ]
+ added, dropped, tagged, swapped = mbg.link_tests(nodes_by_id, edges)
+ self.assertEqual(added, 0)
+ self.assertEqual(dropped, 1)
+ self.assertEqual(swapped, 1) # surviving edge IS a swap
+ tested_by_edges = [e for e in edges if e["type"] == "tested_by"]
+ self.assertEqual(len(tested_by_edges), 1)
+ edge = tested_by_edges[0]
+ self.assertEqual(edge["source"], "file:src/foo.ts")
+ self.assertEqual(edge["target"], "file:src/foo.test.ts")
+ self.assertEqual(edge["weight"], 0.9)
+ self.assertIn("direction corrected", edge["description"].lower())
+
+ def test_dup_swapped_then_canonical_heavier_clears_swapped_count(self) -> None:
+ # Inverted lighter first (swap is applied, swapped_pairs={pair}),
+ # then canonical heavier replaces — the surviving edge is canonical
+ # so `swapped` must drop back to 0.
+ nodes_by_id = {
+ "file:src/foo.ts": _file_node("src/foo.ts"),
+ "file:src/foo.test.ts": _file_node("src/foo.test.ts"),
+ }
+ edges: list[dict[str, Any]] = [
+ {"source": "file:src/foo.test.ts", "target": "file:src/foo.ts",
+ "type": "tested_by", "direction": "forward", "weight": 0.3},
+ {"source": "file:src/foo.ts", "target": "file:src/foo.test.ts",
+ "type": "tested_by", "direction": "forward", "weight": 0.9},
+ ]
+ added, dropped, tagged, swapped = mbg.link_tests(nodes_by_id, edges)
+ self.assertEqual(added, 0)
+ self.assertEqual(dropped, 1)
+ self.assertEqual(swapped, 0) # surviving edge is canonical, not a swap
+ tested_by_edges = [e for e in edges if e["type"] == "tested_by"]
+ self.assertEqual(len(tested_by_edges), 1)
+ self.assertEqual(tested_by_edges[0]["weight"], 0.9)
+
+ def test_dup_two_inverted_keeps_heavier_swapped_once(self) -> None:
+ # Both inverted, different weights. The heavier one wins the slot
+ # after both get swapped; `swapped` reflects the surviving edge,
+ # not the wasted swap on the dropped lighter one.
+ nodes_by_id = {
+ "file:src/foo.ts": _file_node("src/foo.ts"),
+ "file:src/foo.test.ts": _file_node("src/foo.test.ts"),
+ }
+ edges: list[dict[str, Any]] = [
+ {"source": "file:src/foo.test.ts", "target": "file:src/foo.ts",
+ "type": "tested_by", "direction": "forward", "weight": 0.3},
+ {"source": "file:src/foo.test.ts", "target": "file:src/foo.ts",
+ "type": "tested_by", "direction": "forward", "weight": 0.9},
+ ]
+ added, dropped, tagged, swapped = mbg.link_tests(nodes_by_id, edges)
+ self.assertEqual(added, 0)
+ self.assertEqual(dropped, 1)
+ self.assertEqual(swapped, 1)
+ tested_by_edges = [e for e in edges if e["type"] == "tested_by"]
+ self.assertEqual(len(tested_by_edges), 1)
+ edge = tested_by_edges[0]
+ self.assertEqual(edge["weight"], 0.9)
+ self.assertIn("direction corrected", edge["description"].lower())
+
+ def test_drops_duplicate_canonical_edges(self) -> None:
+ # Two LLM edges describing the same (production, test) pair — keep
+ # one, drop the other.
+ nodes_by_id = {
+ "file:src/foo.ts": _file_node("src/foo.ts"),
+ "file:src/foo.test.ts": _file_node("src/foo.test.ts"),
+ }
+ edges: list[dict[str, Any]] = [
+ {
+ "source": "file:src/foo.ts",
+ "target": "file:src/foo.test.ts",
+ "type": "tested_by",
+ "direction": "forward",
+ "weight": 0.5,
+ },
+ {
+ "source": "file:src/foo.test.ts",
+ "target": "file:src/foo.ts",
+ "type": "tested_by",
+ "direction": "forward",
+ "weight": 0.5,
+ },
+ ]
+
+ added, dropped, tagged, swapped = mbg.link_tests(nodes_by_id, edges)
+
+ self.assertEqual(added, 0)
+ # First edge was canonical; second was inverted but described the
+ # same pair → dropped as a duplicate (not a swap).
+ self.assertEqual(dropped, 1)
+ self.assertEqual(swapped, 0)
+ self.assertEqual(tagged, 1)
+ self.assertEqual(len([e for e in edges if e["type"] == "tested_by"]), 1)
+
+ def test_supplement_skips_pair_already_covered_by_llm(self) -> None:
+ # If the LLM (after swap) already covers a (production, test) pair
+ # that a path-convention candidate would also produce, Pass 2 must
+ # not emit a duplicate.
+ nodes_by_id = {
+ "file:src/foo.ts": _file_node("src/foo.ts"),
+ "file:src/foo.test.ts": _file_node("src/foo.test.ts"),
+ "file:src/bar.ts": _file_node("src/bar.ts"),
+ "file:src/bar.test.ts": _file_node("src/bar.test.ts"),
+ }
+ # LLM only emitted (and inverted) the foo pair. The bar pair is
+ # covered by Pass 2 (path convention).
+ edges: list[dict[str, Any]] = [
+ {
+ "source": "file:src/foo.test.ts",
+ "target": "file:src/foo.ts",
+ "type": "tested_by",
+ "direction": "forward",
+ "weight": 0.5,
+ },
+ ]
+
+ added, dropped, tagged, swapped = mbg.link_tests(nodes_by_id, edges)
+
+ self.assertEqual(swapped, 1)
+ self.assertEqual(added, 1) # only bar; foo is already covered
+ self.assertEqual(dropped, 0)
+ self.assertEqual(tagged, 2)
+ tested_by_edges = sorted(
+ [e for e in edges if e["type"] == "tested_by"],
+ key=lambda e: e["source"],
+ )
+ self.assertEqual(len(tested_by_edges), 2)
+
+ def test_swap_recovers_real_world_one_test_many_production(self) -> None:
+ # Real case from microservices-demo: shippingservice_test.go does
+ # not have a `shippingservice.go` sibling — it tests `main.go`,
+ # `tracker.go`, and `quote.go`. Path convention can't pair these,
+ # but the LLM saw the same-package usage and emitted the edges
+ # (with wrong direction). Swap should recover them.
+ nodes_by_id = {
+ "file:src/shippingservice/main.go": _file_node("src/shippingservice/main.go"),
+ "file:src/shippingservice/tracker.go": _file_node("src/shippingservice/tracker.go"),
+ "file:src/shippingservice/quote.go": _file_node("src/shippingservice/quote.go"),
+ "file:src/shippingservice/shippingservice_test.go": _file_node("src/shippingservice/shippingservice_test.go"),
+ }
+ edges: list[dict[str, Any]] = [
+ {
+ "source": "file:src/shippingservice/shippingservice_test.go",
+ "target": "file:src/shippingservice/main.go",
+ "type": "tested_by",
+ "direction": "forward",
+ "weight": 0.5,
+ },
+ {
+ "source": "file:src/shippingservice/shippingservice_test.go",
+ "target": "file:src/shippingservice/tracker.go",
+ "type": "tested_by",
+ "direction": "forward",
+ "weight": 0.5,
+ },
+ ]
+
+ added, dropped, tagged, swapped = mbg.link_tests(nodes_by_id, edges)
+
+ self.assertEqual(swapped, 2)
+ # Pass 2 fallback: the test file with no shippingservice.go sibling
+ # produces no path-convention candidate — we rely entirely on swap.
+ self.assertEqual(added, 0)
+ self.assertEqual(dropped, 0)
+ # main.go and tracker.go were tagged; quote.go was not (LLM didn't
+ # emit an edge for it, and there's no path-convention pair).
+ self.assertEqual(tagged, 2)
+ self.assertIn("tested", nodes_by_id["file:src/shippingservice/main.go"]["tags"])
+ self.assertIn("tested", nodes_by_id["file:src/shippingservice/tracker.go"]["tags"])
+ self.assertNotIn("tested", nodes_by_id["file:src/shippingservice/quote.go"]["tags"])
+
+ def test_unrelated_edges_pass_through(self) -> None:
+ nodes_by_id = {
+ "file:src/foo.ts": _file_node("src/foo.ts"),
+ "file:src/foo.test.ts": _file_node("src/foo.test.ts"),
+ }
+ edges: list[dict[str, Any]] = [
+ {
+ "source": "file:src/foo.test.ts",
+ "target": "file:src/foo.ts",
+ "type": "tested_by",
+ "direction": "forward",
+ "weight": 0.5,
+ },
+ {
+ "source": "file:src/foo.ts",
+ "target": "file:src/foo.test.ts",
+ "type": "imports",
+ "direction": "forward",
+ "weight": 0.7,
+ },
+ ]
+
+ mbg.link_tests(nodes_by_id, edges)
+
+ import_edges = [e for e in edges if e["type"] == "imports"]
+ self.assertEqual(len(import_edges), 1)
+ self.assertEqual(import_edges[0]["source"], "file:src/foo.ts")
+ self.assertEqual(import_edges[0]["target"], "file:src/foo.test.ts")
+ self.assertEqual(import_edges[0]["weight"], 0.7)
+
+ def test_direction_always_forward_production_to_test(self) -> None:
+ nodes_by_id = {
+ "file:src/foo.ts": _file_node("src/foo.ts"),
+ "file:src/__tests__/foo.test.ts": _file_node("src/__tests__/foo.test.ts"),
+ "file:internal/bar.go": _file_node("internal/bar.go"),
+ "file:internal/bar_test.go": _file_node("internal/bar_test.go"),
+ "file:src/main/java/com/foo/Bar.java": _file_node("src/main/java/com/foo/Bar.java"),
+ "file:src/test/java/com/foo/BarTest.java": _file_node("src/test/java/com/foo/BarTest.java"),
+ }
+ edges: list[dict[str, Any]] = []
+
+ added, dropped, tagged, swapped = mbg.link_tests(nodes_by_id, edges)
+
+ self.assertEqual(added, 3)
+ for edge in edges:
+ self.assertEqual(edge["type"], "tested_by")
+ self.assertEqual(edge["direction"], "forward")
+ # Target must be the test file (basename gives it away)
+ self.assertTrue(
+ mbg.is_test_path(edge["target"][len("file:"):]),
+ f"target {edge['target']} should classify as test",
+ )
+ self.assertFalse(
+ mbg.is_test_path(edge["source"][len("file:"):]),
+ f"source {edge['source']} should classify as production",
+ )
+
+ def test_idempotent(self) -> None:
+ nodes_by_id = {
+ "file:src/foo.ts": _file_node("src/foo.ts"),
+ "file:src/foo.test.ts": _file_node("src/foo.test.ts"),
+ }
+ edges: list[dict[str, Any]] = []
+
+ mbg.link_tests(nodes_by_id, edges)
+ # Second invocation must not duplicate edges or tags. The first run
+ # added a canonical supplement edge; the second sees it as canonical
+ # in Pass 1 and keeps it without flipping or duplicating.
+ added2, dropped2, tagged2, swapped2 = mbg.link_tests(nodes_by_id, edges)
+
+ self.assertEqual((added2, dropped2, swapped2), (0, 0, 0))
+ # Tag was already present, so tagged counter for second call is 0.
+ self.assertEqual(tagged2, 0)
+ tested_by_edges = [e for e in edges if e["type"] == "tested_by"]
+ self.assertEqual(len(tested_by_edges), 1)
+ tags = nodes_by_id["file:src/foo.ts"]["tags"]
+ self.assertEqual(tags.count("tested"), 1)
+
+ def test_first_matching_candidate_wins(self) -> None:
+ # If both src/foo.ts and src/foo.tsx exist, the linker should match
+ # exactly one of them (the first candidate). Sibling de-infix yields
+ # .ts before .tsx (since the test is named foo.test.ts).
+ nodes_by_id = {
+ "file:src/foo.ts": _file_node("src/foo.ts"),
+ "file:src/foo.tsx": _file_node("src/foo.tsx"),
+ "file:src/foo.test.ts": _file_node("src/foo.test.ts"),
+ }
+ edges: list[dict[str, Any]] = []
+
+ added, dropped, tagged, swapped = mbg.link_tests(nodes_by_id, edges)
+
+ self.assertEqual(added, 1)
+ self.assertEqual(tagged, 1)
+ # Only one of them gets tagged.
+ ts_tagged = "tested" in nodes_by_id["file:src/foo.ts"]["tags"]
+ tsx_tagged = "tested" in nodes_by_id["file:src/foo.tsx"]["tags"]
+ self.assertTrue(ts_tagged != tsx_tagged, "exactly one should be tagged")
+ # The .ts file should win (it matches the test-file extension).
+ self.assertTrue(ts_tagged)
+
+ def test_does_not_match_test_to_test(self) -> None:
+ # If only test files exist, no edges are produced — we never link a
+ # test to another test.
+ nodes_by_id = {
+ "file:src/foo.test.ts": _file_node("src/foo.test.ts"),
+ "file:src/foo.spec.ts": _file_node("src/foo.spec.ts"),
+ }
+ edges: list[dict[str, Any]] = []
+
+ added, dropped, tagged, swapped = mbg.link_tests(nodes_by_id, edges)
+
+ self.assertEqual(added, 0)
+ self.assertEqual(tagged, 0)
+
+ def test_does_not_duplicate_existing_tag(self) -> None:
+ # Production node already carries the "tested" tag — linker should
+ # not duplicate it.
+ nodes_by_id = {
+ "file:src/foo.ts": _file_node("src/foo.ts", tags=["tested", "core"]),
+ "file:src/foo.test.ts": _file_node("src/foo.test.ts"),
+ }
+ edges: list[dict[str, Any]] = []
+
+ mbg.link_tests(nodes_by_id, edges)
+
+ tags = nodes_by_id["file:src/foo.ts"]["tags"]
+ self.assertEqual(tags.count("tested"), 1)
+ self.assertIn("core", tags)
+
+ def test_empty_input(self) -> None:
+ edges: list[dict[str, Any]] = []
+ added, dropped, tagged, swapped = mbg.link_tests({}, edges)
+ self.assertEqual((added, dropped, tagged, swapped), (0, 0, 0, 0))
+ self.assertEqual(edges, [])
+
+ def test_node_without_filepath_falls_back_to_id(self) -> None:
+ # A file node with only `id` (no `filePath`) should still pair via
+ # the path embedded in the ID.
+ prod = {"id": "file:src/foo.ts", "type": "file", "name": "foo.ts", "tags": []}
+ test = {
+ "id": "file:src/foo.test.ts",
+ "type": "file",
+ "name": "foo.test.ts",
+ "tags": [],
+ }
+ nodes_by_id = {prod["id"]: prod, test["id"]: test}
+ edges: list[dict[str, Any]] = []
+
+ added, dropped, tagged, swapped = mbg.link_tests(nodes_by_id, edges)
+
+ self.assertEqual((added, dropped, tagged, swapped), (1, 0, 1, 0))
+ self.assertEqual(edges[0]["source"], "file:src/foo.ts")
+ self.assertEqual(edges[0]["target"], "file:src/foo.test.ts")
+ self.assertIn("tested", prod["tags"])
+
+ def test_malformed_tags_is_replaced_not_crashed(self) -> None:
+ # Raw LLM batch JSON can ship `tags` as None, a string, or other
+ # non-list values — the TypeScript autoFixGraph normalizer runs
+ # downstream of this script. The linker must coerce instead of crash.
+ for bad_tags in (None, "tested,foo", "single", 0, {"k": "v"}):
+ with self.subTest(bad_tags=bad_tags):
+ prod = {
+ "id": "file:src/foo.ts",
+ "type": "file",
+ "name": "foo.ts",
+ "filePath": "src/foo.ts",
+ "tags": bad_tags,
+ }
+ test = _file_node("src/foo.test.ts")
+ nodes_by_id = {prod["id"]: prod, test["id"]: test}
+ edges: list[dict[str, Any]] = []
+
+ added, dropped, tagged, swapped = mbg.link_tests(nodes_by_id, edges)
+
+ self.assertEqual((added, dropped, tagged, swapped), (1, 0, 1, 0))
+ self.assertEqual(prod["tags"], ["tested"])
+
+
+# ── merge_and_normalize integration ───────────────────────────────────────
+
+class MergeIntegrationTests(unittest.TestCase):
+ """Verify the linker is wired into merge_and_normalize correctly."""
+
+ def test_linker_runs_during_merge(self) -> None:
+ batch = {
+ "nodes": [
+ {
+ "id": "file:src/foo.ts",
+ "type": "file",
+ "name": "foo.ts",
+ "filePath": "src/foo.ts",
+ "summary": "",
+ "tags": [],
+ "complexity": "simple",
+ },
+ {
+ "id": "file:src/foo.test.ts",
+ "type": "file",
+ "name": "foo.test.ts",
+ "filePath": "src/foo.test.ts",
+ "summary": "",
+ "tags": [],
+ "complexity": "simple",
+ },
+ ],
+ "edges": [
+ # An LLM-emitted (inverted) tested_by edge — should be dropped
+ {
+ "source": "file:src/foo.test.ts",
+ "target": "file:src/foo.ts",
+ "type": "tested_by",
+ "direction": "forward",
+ "weight": 0.5,
+ },
+ ],
+ }
+
+ assembled, _report = mbg.merge_and_normalize([batch])
+
+ # Output should have exactly one tested_by edge with canonical direction
+ tested_by_edges = [e for e in assembled["edges"] if e["type"] == "tested_by"]
+ self.assertEqual(len(tested_by_edges), 1)
+ self.assertEqual(tested_by_edges[0]["source"], "file:src/foo.ts")
+ self.assertEqual(tested_by_edges[0]["target"], "file:src/foo.test.ts")
+
+ # Production node tagged
+ prod_node = next(n for n in assembled["nodes"] if n["id"] == "file:src/foo.ts")
+ self.assertIn("tested", prod_node["tags"])
+
+
+if __name__ == "__main__":
+ unittest.main()