diff --git a/.env.example b/.env.example
index 2a19a20..66cb7d8 100644
--- a/.env.example
+++ b/.env.example
@@ -4,7 +4,8 @@
# This directory will be mounted read-only inside the container at /repos/project.
REPO_PATH=/path/to/your/repository
-# Optional overrides (defaults are already set in docker-compose.yml):
-# NEO4J_PASSWORD=changeme
+# Optional overrides (defaults are already set in docker-compose.*.yml):
+# NEO4J_PASSWORD=changeme # Neo4j stack
+# GRAPH_BACKEND=janusgraph # JanusGraph stack
# DEFAULT_REPO_ID=my-repo
# LOG_LEVEL=INFO
diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml
index 204d331..46ee5ff 100644
--- a/.github/workflows/release.yml
+++ b/.github/workflows/release.yml
@@ -165,26 +165,38 @@ jobs:
contents: write
steps:
+ - uses: actions/checkout@v4
+
- name: Download dist artifacts
uses: actions/download-artifact@v4
with:
name: dist
path: dist/
+ - name: Extract changelog for this version
+ run: |
+ # Strip leading "v" from tag to match changelog headers (e.g. v0.3.0 → 0.3.0)
+ VERSION="${GITHUB_REF_NAME#v}"
+ # Extract the section between this version's header and the next "## [" header
+ awk "/^## \\[${VERSION}\\]/{found=1; next} /^## \\[/{if(found) exit} found{print}" CHANGELOG.md > /tmp/release-notes.md
+ # Append Docker image section
+ {
+ echo ""
+ echo "## Docker image"
+ echo ""
+ echo '```bash'
+ echo "docker pull ghcr.io/bitkaio/codesteward:${{ github.ref_name }}"
+ echo '```'
+ echo ""
+ echo "Full setup guide: [AGENT_SETUP.md](https://github.com/bitkaio/codesteward/blob/main/AGENT_SETUP.md)"
+ } >> /tmp/release-notes.md
+
- name: Create release
uses: softprops/action-gh-release@v2
with:
- generate_release_notes: true
+ body_path: /tmp/release-notes.md
files: |
dist/codesteward-graph/dist/*.whl
dist/codesteward-graph/dist/*.tar.gz
dist/codesteward-mcp/dist/*.whl
dist/codesteward-mcp/dist/*.tar.gz
- body: |
- ## Docker image
-
- ```bash
- docker pull ghcr.io/bitkaio/codesteward:${{ github.ref_name }}
- ```
-
- Full setup guide: [AGENT_SETUP.md](https://github.com/bitkaio/codesteward/blob/main/AGENT_SETUP.md)
diff --git a/AGENT_SETUP.md b/AGENT_SETUP.md
index fbda6e5..890be4a 100644
--- a/AGENT_SETUP.md
+++ b/AGENT_SETUP.md
@@ -61,7 +61,10 @@ $env:REPO_PATH = "C:\path\to\your\repo" # PowerShell
echo "REPO_PATH=/path/to/your/repository" > .env
# Start Neo4j + MCP server
-docker compose up -d
+docker compose -f docker-compose.neo4j.yml up -d
+
+# Or: JanusGraph (Apache 2.0 alternative)
+# docker compose -f docker-compose.janusgraph.yml up -d
```
The server starts at **`http://localhost:3000/sse`**. It already knows the
diff --git a/Dockerfile.mcp b/Dockerfile.mcp
index c41814c..981925b 100644
--- a/Dockerfile.mcp
+++ b/Dockerfile.mcp
@@ -4,7 +4,11 @@
# Build targets:
# default (graph-all) — all 14 tree-sitter language grammars
# graph-core — TypeScript / JavaScript / Python / Java only
-# base — no tree-sitter (Neo4j query-only mode)
+# base — no tree-sitter (query-only mode)
+#
+# Graph backend:
+# The image supports both Neo4j and JanusGraph. Select at runtime via
+# GRAPH_BACKEND=neo4j (default) or GRAPH_BACKEND=janusgraph.
#
# Taint analysis (optional):
# The codesteward-taint binary is bundled by default using the latest GitHub
@@ -15,11 +19,19 @@
# docker build -t codesteward-mcp . # latest taint
# docker build --build-arg TAINT_VERSION=0.1.0 -t codesteward-mcp . # pinned taint
# docker build --build-arg TAINT_VERSION=none -t codesteward-mcp . # no taint
+#
+# # Neo4j backend:
# docker run -p 3000:3000 \
# -e NEO4J_URI=bolt://neo4j:7687 \
# -e NEO4J_USER=neo4j \
# -e NEO4J_PASSWORD=secret \
# codesteward-mcp
+#
+# # JanusGraph backend:
+# docker run -p 3000:3000 \
+# -e GRAPH_BACKEND=janusgraph \
+# -e JANUSGRAPH_URL=ws://janusgraph:8182/gremlin \
+# codesteward-mcp
ARG PYTHON_VERSION=3.12
ARG INSTALL_EXTRA=graph-all
@@ -68,7 +80,7 @@ COPY packages/ packages/
# we can copy across. Both packages are workspace members, so pip resolves them
# from the local packages/ tree without hitting PyPI.
RUN pip install --no-cache-dir --prefix=/install \
- "./packages/codesteward-graph[${INSTALL_EXTRA}]" \
+ "./packages/codesteward-graph[${INSTALL_EXTRA},janusgraph]" \
"./packages/codesteward-mcp"
# ── final stage ──────────────────────────────────────────────────────────────
diff --git a/README.md b/README.md
index b76e9ee..28bdc7d 100644
--- a/README.md
+++ b/README.md
@@ -12,7 +12,7 @@
Structural code graph server for AI agents.
- Parse any repository into a queryable Neo4j graph via tree-sitter AST — and expose it as an MCP tool interface your AI agent can call directly.
+ Parse any repository into a queryable graph (Neo4j or JanusGraph) via tree-sitter AST — and expose it as an MCP tool interface your AI agent can call directly.
---
@@ -34,8 +34,8 @@ Rather than scanning files repeatedly, the agent queries a pre-built graph — c
| Tool | Description |
| ---- | ----------- |
-| `graph_rebuild` | Parse a repository and write the structural graph to Neo4j (or run in stub mode without Neo4j) |
-| `codebase_graph_query` | Query via named templates (`lexical`, `referential`, `semantic`, `dependency`) or raw Cypher |
+| `graph_rebuild` | Parse a repository and write the structural graph to Neo4j or JanusGraph (or run in stub mode without a graph backend) |
+| `codebase_graph_query` | Query via named templates (`lexical`, `referential`, `semantic`, `dependency`) or raw passthrough (`cypher` / `gremlin`) |
| `graph_augment` | Add agent-inferred relationships (confidence < 1.0) back into the graph |
| `graph_status` | Return metadata: node/edge counts, last build time, Neo4j connectivity |
| `taint_analysis` | *(optional)* Run taint-flow analysis via the `codesteward-taint` binary and write `TAINT_FLOW` edges to Neo4j |
@@ -47,7 +47,7 @@ Rather than scanning files repeatedly, the agent queries a pre-built graph — c
One-time setup. Works across every repository on your machine without any per-project config.
Supports **Claude Code** and **OpenAI Codex CLI**.
-**Prerequisites:** [uv](https://docs.astral.sh/uv/) · Neo4j 5+ running locally · *(optional)* `codesteward-taint` on `PATH`
+**Prerequisites:** [uv](https://docs.astral.sh/uv/) · Neo4j 5+ or JanusGraph 1.0+ running locally · *(optional)* `codesteward-taint` on `PATH`
#### Claude Code
@@ -73,6 +73,17 @@ Merge this into your existing file (or create it):
Claude Code spawns the MCP server as a subprocess — no Docker, no volume mounts, no separate process to manage. `uvx` downloads and caches the package on first run. Neo4j credentials are passed as env vars; omit them to run in stub mode (no persistence).
+**JanusGraph alternative** — replace the `env` block above with:
+
+```json
+ "env": {
+ "GRAPH_BACKEND": "janusgraph",
+ "JANUSGRAPH_URL": "ws://localhost:8182/gremlin"
+ }
+```
+
+Add `janusgraph` to the extras: `"codesteward-mcp[graph-all,janusgraph]"`.
+
**2. Add the global instruction file at `~/.claude/CLAUDE.md`**
```bash
@@ -110,6 +121,8 @@ mcp_servers:
NEO4J_PASSWORD: "your-neo4j-password"
```
+For JanusGraph, replace the `env` block with `GRAPH_BACKEND: "janusgraph"` and `JANUSGRAPH_URL: "ws://localhost:8182/gremlin"`, and add the `janusgraph` extra to the args.
+
**2. Add the global instruction file at `~/AGENTS.md`**
```bash
@@ -173,7 +186,7 @@ Requires [uv](https://docs.astral.sh/uv/). `uvx` downloads and caches the packag
export REPO_PATH=/path/to/your/repository
# 2. Start Neo4j + MCP server
-docker compose up -d
+docker compose -f docker-compose.neo4j.yml up -d
# 3. Copy config templates into the repo you want to analyse
cp templates/.mcp.json /path/to/your/repository/
@@ -182,6 +195,22 @@ cp templates/CLAUDE.md /path/to/your/repository/
The server runs at **`http://localhost:3000/sse`**. Call `graph_rebuild()` with no arguments — the server already knows the repo path from the volume mount.
+### Docker + JanusGraph — persistent graph (Apache 2.0)
+
+```bash
+# 1. Point the server at your repository
+export REPO_PATH=/path/to/your/repository
+
+# 2. Start JanusGraph + MCP server
+docker compose -f docker-compose.janusgraph.yml up -d
+
+# 3. Copy config templates into the repo you want to analyse
+cp templates/.mcp.json /path/to/your/repository/
+cp templates/CLAUDE.md /path/to/your/repository/
+```
+
+Same workflow as the Neo4j stack — all named query templates work identically. Raw query passthrough uses Gremlin instead of Cypher.
+
### Manual Docker run
```bash
@@ -211,9 +240,12 @@ uv pip install "codesteward-mcp[graph-scala]" # Scala
uv pip install "codesteward-mcp[graph-c]" # C
uv pip install "codesteward-mcp[graph-cpp]" # C++
uv pip install "codesteward-mcp[graph-php]" # PHP
+
+# JanusGraph backend (alternative to Neo4j)
+uv pip install "codesteward-mcp[graph-all,janusgraph]"
```
-Requires Python 3.12+. Neo4j 5+ is optional — the server runs in stub mode without it.
+Requires Python 3.12+. Neo4j 5+ or JanusGraph 1.0+ is optional — the server runs in stub mode without a graph backend.
## Configuration
@@ -225,9 +257,11 @@ Priority: **CLI flags > env vars > YAML file > defaults**.
| Transport | `TRANSPORT` | `sse` | `sse`, `http`, or `stdio` |
| Host | `HOST` | `0.0.0.0` | HTTP bind host |
| Port | `PORT` | `3000` | HTTP bind port |
+| Graph backend | `GRAPH_BACKEND` | `neo4j` | `neo4j` or `janusgraph` |
| Neo4j URI | `NEO4J_URI` | `bolt://localhost:7687` | Neo4j connection URI |
| Neo4j user | `NEO4J_USER` | `neo4j` | Neo4j username |
| Neo4j password | `NEO4J_PASSWORD` | *(empty)* | Leave empty for stub mode |
+| JanusGraph URL | `JANUSGRAPH_URL` | `ws://localhost:8182/gremlin` | Gremlin Server WebSocket URL |
| Default tenant | `DEFAULT_TENANT_ID` | `local` | Tenant namespace |
| Default repo | `DEFAULT_REPO_ID` | *(empty)* | Repo ID |
| Default repo path | `DEFAULT_REPO_PATH` | `/repos/project` | Server-side path for `graph_rebuild` |
diff --git a/docker-compose.janusgraph.yml b/docker-compose.janusgraph.yml
new file mode 100644
index 0000000..458cf56
--- /dev/null
+++ b/docker-compose.janusgraph.yml
@@ -0,0 +1,96 @@
+# Codesteward MCP Graph Server — JanusGraph stack
+#
+# Drop-in alternative to docker-compose.neo4j.yml that uses JanusGraph
+# (Apache 2.0) instead of Neo4j. All MCP tools and named query templates
+# work identically.
+#
+# Includes:
+# janusgraph — graph database (Gremlin :8182)
+# codesteward — MCP server (HTTP+SSE :3000)
+#
+# Quick start:
+# 1. Set REPO_PATH to the absolute path of the repository you want to analyse:
+# export REPO_PATH=/path/to/your/repo # macOS / Linux
+# set REPO_PATH=C:\path\to\your\repo # Windows CMD
+# $env:REPO_PATH="C:\path\to\your\repo" # PowerShell
+# Or create a .env file in this directory:
+# echo "REPO_PATH=/path/to/your/repo" > .env
+# 2. Run: docker compose -f docker-compose.janusgraph.yml up -d
+# 3. MCP server: http://localhost:3000/mcp
+#
+# The graph_rebuild tool requires no arguments in this setup — the server
+# already knows the repository is mounted at /repos/project.
+#
+# docker compose -f docker-compose.janusgraph.yml down -v ← removes volumes
+
+services:
+
+ # ── JanusGraph ──────────────────────────────────────────────────────────────
+ janusgraph:
+ image: janusgraph/janusgraph:1.1.0
+ container_name: codesteward-janusgraph
+ restart: unless-stopped
+ environment:
+ # Default template: berkeleyje-lucene (BerkeleyDB JE storage + Lucene indexing)
+ # Embedded single-node backend — no external Cassandra/HBase required.
+ # Data is persisted to /var/lib/janusgraph via the volume below.
+ JANUS_PROPS_TEMPLATE: "berkeleyje-lucene"
+ JAVA_OPTIONS: "-Xms512m -Xmx1g"
+ ports:
+ - "8182:8182" # Gremlin Server (WebSocket)
+ volumes:
+ - janusgraph-data:/var/lib/janusgraph # persistent graph data + indexes
+ healthcheck:
+ test: ["CMD-SHELL",
+ "python3 -c \"import socket; s=socket.socket(); s.settimeout(5); s.connect(('localhost',8182)); s.close()\" || exit 1"]
+ interval: 15s
+ timeout: 10s
+ retries: 5
+ start_period: 45s
+
+ # ── Codesteward MCP Server ─────────────────────────────────────────────────
+ codesteward:
+ # Published image (recommended) — pulls from GitHub Container Registry.
+ # Replace the tag with a specific version (e.g. :0.1.0) to pin to a release.
+ image: ghcr.io/bitkaio/codesteward:latest
+ # ── Local build alternative (uncomment to build from source) ──────────────
+ # build:
+ # context: .
+ # dockerfile: Dockerfile.mcp
+ # args:
+ # INSTALL_EXTRA: "graph-all"
+ # image: codesteward-mcp:local
+ container_name: codesteward-mcp
+ restart: unless-stopped
+ depends_on:
+ janusgraph:
+ condition: service_healthy
+ environment:
+ TRANSPORT: "sse"
+ HOST: "0.0.0.0"
+ PORT: "3000"
+ GRAPH_BACKEND: "janusgraph"
+ JANUSGRAPH_URL: "ws://janusgraph:8182/gremlin"
+ DEFAULT_TENANT_ID: "local"
+ DEFAULT_REPO_ID: "" # optional: set to a stable name for your repo
+ DEFAULT_REPO_PATH: "/repos/project" # matches the volume mount below
+ WORKSPACE_BASE: "/workspace"
+ LOG_LEVEL: "INFO"
+ ports:
+ - "3000:3000"
+ volumes:
+ - mcp-workspace:/workspace
+ # The repository to analyse — set REPO_PATH in your environment or .env file.
+ # Falls back to the current directory if REPO_PATH is not set.
+ - ${REPO_PATH:-.}:/repos/project:ro
+ healthcheck:
+ test: ["CMD", "python", "-c",
+ "import socket; s=socket.socket(); s.settimeout(3); s.connect(('localhost',3000)); s.close()"]
+ interval: 30s
+ timeout: 5s
+ retries: 3
+ start_period: 15s
+
+volumes:
+ janusgraph-data:
+ mcp-workspace:
diff --git a/docker-compose.yml b/docker-compose.neo4j.yml
similarity index 93%
rename from docker-compose.yml
rename to docker-compose.neo4j.yml
index 706b686..076a21d 100644
--- a/docker-compose.yml
+++ b/docker-compose.neo4j.yml
@@ -1,9 +1,11 @@
-# Codesteward MCP Graph Server — local development stack
+# Codesteward MCP Graph Server — Neo4j stack
#
# Includes:
# neo4j — graph database (bolt :7687, browser :7474)
# codesteward — MCP server (HTTP+SSE :3000)
#
+# For a fully open-source alternative, see docker-compose.janusgraph.yml.
+#
# Quick start:
# 1. Set REPO_PATH to the absolute path of the repository you want to analyse:
# export REPO_PATH=/path/to/your/repo # macOS / Linux
@@ -11,14 +13,14 @@
# $env:REPO_PATH="C:\path\to\your\repo" # PowerShell
# Or create a .env file in this directory:
# echo "REPO_PATH=/path/to/your/repo" > .env
-# 2. Run: docker compose up -d
+# 2. Run: docker compose -f docker-compose.neo4j.yml up -d
# 3. MCP server: http://localhost:3000/mcp
# Neo4j Browser: http://localhost:7474 (neo4j / changeme)
#
# The graph_rebuild tool requires no arguments in this setup — the server
# already knows the repository is mounted at /repos/project.
#
-# docker compose down -v ← removes volumes (wipes graph data + Neo4j)
+# docker compose -f docker-compose.neo4j.yml down -v ← removes volumes
services:
diff --git a/packages/codesteward-graph/README.md b/packages/codesteward-graph/README.md
index c0c06c1..4f7132f 100644
--- a/packages/codesteward-graph/README.md
+++ b/packages/codesteward-graph/README.md
@@ -1,7 +1,7 @@
# codesteward-graph
Multi-language structural code graph builder — parses source repositories into
-`LexicalNode` + edge data and writes to Neo4j.
+`LexicalNode` + edge data and writes to Neo4j or JanusGraph.
Part of the [Codesteward MCP](https://github.com/bitkaio/codesteward-mcp) project.
For full documentation, setup guides, and the MCP server, see the main repository.
@@ -13,7 +13,8 @@ For full documentation, setup guides, and the MCP server, see the main repositor
- Extracts functions, classes, imports, call graphs, inheritance chains, and auth guard
annotations (`GUARDED_BY` / `PROTECTED_BY` edges)
- Resolves cross-file call relationships in a single post-parse pass
-- Writes to Neo4j with tenant + repo namespacing; operates in stub mode without Neo4j
+- Writes to Neo4j or JanusGraph with tenant + repo namespacing; operates in stub mode
+ without a graph backend
## Install
@@ -24,6 +25,9 @@ uv add "codesteward-graph[graph]"
# All 14 languages
uv add "codesteward-graph[graph-all]"
+# JanusGraph backend (alternative to Neo4j)
+uv add "codesteward-graph[janusgraph]"
+
# Without tree-sitter (COBOL only; all other parsers will raise ImportError)
uv add codesteward-graph
```
@@ -35,7 +39,7 @@ import asyncio
from codesteward.engine.graph_builder import GraphBuilder
async def main():
- builder = GraphBuilder() # stub mode — no Neo4j
+ builder = GraphBuilder() # stub mode — no graph backend
summary = await builder.build_graph(
repo_path="/path/to/repo",
tenant_id="local",
diff --git a/packages/codesteward-graph/pyproject.toml b/packages/codesteward-graph/pyproject.toml
index 2711600..1ed00e1 100644
--- a/packages/codesteward-graph/pyproject.toml
+++ b/packages/codesteward-graph/pyproject.toml
@@ -49,6 +49,8 @@ graph-c = ["tree-sitter>=0.21", "tree-sitter-c>=0.21"]
graph-cpp = ["tree-sitter>=0.21", "tree-sitter-cpp>=0.21"]
graph-rust = ["tree-sitter>=0.21", "tree-sitter-rust>=0.21"]
graph-php = ["tree-sitter>=0.21", "tree-sitter-php>=0.21"]
+# JanusGraph backend via Gremlin (Apache TinkerPop)
+janusgraph = ["gremlinpython>=3.7"]
# All 14 languages at once
graph-all = [
"codesteward-graph[graph]",
diff --git a/packages/codesteward-graph/src/codesteward/engine/backends/__init__.py b/packages/codesteward-graph/src/codesteward/engine/backends/__init__.py
new file mode 100644
index 0000000..491232f
--- /dev/null
+++ b/packages/codesteward-graph/src/codesteward/engine/backends/__init__.py
@@ -0,0 +1,35 @@
+"""Graph database backend abstraction.
+
+Provides a unified interface for graph storage backends (Neo4j, JanusGraph)
+so that the rest of the codebase is backend-agnostic.
+"""
+
+from codesteward.engine.backends.base import GraphBackend
+from codesteward.engine.backends.neo4j import Neo4jBackend
+
+__all__ = ["GraphBackend", "Neo4jBackend", "get_backend"]
+
+
+def get_backend(backend_type: str, **kwargs) -> GraphBackend:
+ """Factory for graph backends.
+
+ Args:
+ backend_type: One of ``"neo4j"`` or ``"janusgraph"``.
+ **kwargs: Backend-specific connection parameters.
+
+ Returns:
+ Configured GraphBackend instance.
+
+ Raises:
+ ValueError: If ``backend_type`` is not recognised.
+ """
+ match backend_type:
+ case "neo4j":
+ return Neo4jBackend(**kwargs)
+ case "janusgraph":
+ from codesteward.engine.backends.janusgraph import JanusGraphBackend
+ return JanusGraphBackend(**kwargs)
+ case _:
+ raise ValueError(
+ f"Unknown graph backend {backend_type!r}; valid: 'neo4j', 'janusgraph'"
+ )
diff --git a/packages/codesteward-graph/src/codesteward/engine/backends/base.py b/packages/codesteward-graph/src/codesteward/engine/backends/base.py
new file mode 100644
index 0000000..d4bb107
--- /dev/null
+++ b/packages/codesteward-graph/src/codesteward/engine/backends/base.py
@@ -0,0 +1,163 @@
+"""Abstract base class for graph database backends."""
+
+from abc import ABC, abstractmethod
+from typing import Any
+
+
+class GraphBackend(ABC):
+ """Unified interface for graph storage backends.
+
+ Implementations must handle connection lifecycle, node/edge writes,
+ queries, and cleanup. All methods are async to support both Neo4j
+ (native async) and JanusGraph (HTTP/WebSocket to Gremlin Server).
+ """
+
+ @abstractmethod
+ def is_connected(self) -> bool:
+ """Return True if the backend has a live connection configured."""
+
+ @abstractmethod
+ async def close(self) -> None:
+ """Release backend resources (driver/connection pool)."""
+
+ # ── Write operations ─────────────────────────────────────────────────
+
+ @abstractmethod
+ async def write_nodes(self, nodes: list[dict[str, Any]]) -> int:
+ """Upsert nodes into the graph.
+
+ Args:
+ nodes: List of node property dicts. Each must contain at minimum
+ ``node_id``, ``node_type``, ``name``, ``file``, ``tenant_id``,
+ ``repo_id``.
+
+ Returns:
+ Number of nodes written.
+ """
+
+ @abstractmethod
+ async def write_edges(self, edges_by_type: dict[str, list[dict[str, Any]]]) -> int:
+ """Upsert edges into the graph, grouped by relationship type.
+
+ Args:
+ edges_by_type: Mapping of uppercase edge type (e.g. ``"CALLS"``)
+ to list of edge property dicts. Each dict must contain
+ ``edge_id``, ``source_id``, ``target_id``, ``target_name``,
+ ``tenant_id``, ``repo_id``, ``file``, ``line``.
+
+ Returns:
+ Total number of edges written.
+ """
+
+ @abstractmethod
+ async def delete_file_nodes(self, tenant_id: str, repo_id: str, file_path: str) -> None:
+ """Delete all nodes and edges scoped to a specific file.
+
+ Args:
+ tenant_id: Tenant namespace.
+ repo_id: Repository identifier.
+ file_path: Repo-relative path of the file to remove.
+ """
+
+ # ── Query operations ─────────────────────────────────────────────────
+
+ @abstractmethod
+ async def query_named(
+ self,
+ query_type: str,
+ tenant_id: str,
+ repo_id: str,
+ filter_str: str,
+ limit: int,
+ ) -> list[dict[str, Any]]:
+ """Execute a named query template.
+
+ Args:
+ query_type: One of ``lexical``, ``referential``, ``semantic``,
+ ``dependency``.
+ tenant_id: Tenant namespace.
+ repo_id: Repository identifier.
+ filter_str: Substring filter on name/file (empty = no filter).
+ limit: Maximum rows to return.
+
+ Returns:
+ List of result row dicts.
+
+ Raises:
+ ValueError: If ``query_type`` is not recognised.
+ """
+
+ @abstractmethod
+ async def query_raw(
+ self,
+ query: str,
+ params: dict[str, Any],
+ ) -> list[dict[str, Any]]:
+ """Execute a raw backend-native query.
+
+ For Neo4j this is Cypher; for JanusGraph this is Gremlin.
+
+ Args:
+ query: The raw query string.
+ params: Query parameters.
+
+ Returns:
+ List of result row dicts.
+ """
+
+ @abstractmethod
+ async def count_nodes(self, tenant_id: str, repo_id: str) -> int | None:
+ """Return the total node count for a tenant/repo, or None on error.
+
+ Args:
+ tenant_id: Tenant namespace.
+ repo_id: Repository identifier.
+
+ Returns:
+ Node count, or None if the query fails.
+ """
+
+ # ── Augment (agent-inferred edges) ───────────────────────────────────
+
+ @abstractmethod
+ async def write_augment_edge(
+ self,
+ edge_type: str,
+ source_id: str,
+ target_id: str,
+ target_name: str,
+ tenant_id: str,
+ repo_id: str,
+ edge_id: str,
+ file: str,
+ line: int | None,
+ confidence: float,
+ source: str,
+ rationale: str,
+ ) -> None:
+ """Write a single agent-inferred edge.
+
+ Args:
+ edge_type: Relationship type (lowercase, e.g. ``"calls"``).
+ source_id: Source node ID.
+ target_id: Target node ID.
+ target_name: Human-readable target name.
+ tenant_id: Tenant namespace.
+ repo_id: Repository identifier.
+ edge_id: Unique edge identifier.
+ file: File path associated with the edge.
+ line: Line number, or None.
+ confidence: Confidence score (0.0, 1.0).
+ source: Source tag (e.g. ``"agent:security-agent"``).
+ rationale: Brief explanation for the inferred edge.
+ """
+
+ @property
+ @abstractmethod
+ def backend_name(self) -> str:
+ """Return the backend identifier (e.g. ``"neo4j"``, ``"janusgraph"``)."""
+
+ @property
+ @abstractmethod
+ def raw_query_language(self) -> str:
+ """Return the name of the raw query language (e.g. ``"cypher"``, ``"gremlin"``)."""
diff --git a/packages/codesteward-graph/src/codesteward/engine/backends/janusgraph.py b/packages/codesteward-graph/src/codesteward/engine/backends/janusgraph.py
new file mode 100644
index 0000000..d34c2c6
--- /dev/null
+++ b/packages/codesteward-graph/src/codesteward/engine/backends/janusgraph.py
@@ -0,0 +1,397 @@
+"""JanusGraph graph backend implementation via Gremlin (Apache TinkerPop).
+
+Uses the ``gremlinpython`` driver to communicate with JanusGraph Server
+over WebSocket. All queries are expressed in Gremlin traversal language.
+
+JanusGraph property graph mapping:
+ - LexicalNode → vertex with label ``LexicalNode``
+ - Edge types → edge labels (``CALLS``, ``IMPORTS``, ``GUARDED_BY``, …)
+ - Properties → vertex/edge properties matching the Neo4j schema
+"""
+
+from typing import Any
+
+import structlog
+
+from codesteward.engine.backends.base import GraphBackend
+
+log = structlog.get_logger()
+
+
+class JanusGraphBackend(GraphBackend):
+ """JanusGraph backend using gremlinpython.
+
+ Args:
+ url: Gremlin Server WebSocket URL (e.g. ``ws://localhost:8182/gremlin``).
+ """
+
+ def __init__(self, url: str = "ws://localhost:8182/gremlin") -> None:
+ self._url = url
+ self._connection: Any | None = None
+ self._g: Any | None = None
+ try:
+ from gremlin_python.driver.driver_remote_connection import (
+ DriverRemoteConnection,
+ )
+ from gremlin_python.process.anonymous_traversal import traversal
+
+ self._connection = DriverRemoteConnection(url, "g")
+ self._g = traversal().with_remote(self._connection)
+ log.info("janusgraph_connected", url=url)
+ except Exception as exc:
+ log.error("janusgraph_connection_failed", url=url, error=str(exc))
+ self._connection = None
+ self._g = None
+
+ def is_connected(self) -> bool:
+ return self._g is not None
+
+ async def close(self) -> None:
+ if self._connection is not None:
+ try:
+ self._connection.close()
+ except Exception:
+ pass
+
+ # ── Write operations ─────────────────────────────────────────────────
+
+ async def write_nodes(self, nodes: list[dict[str, Any]]) -> int:
+ if not self._g or not nodes:
+ return 0
+
+ from gremlin_python.process.graph_traversal import __
+ from gremlin_python.process.traversal import T
+
+ g = self._g
+ for node in nodes:
+ t = g.V().has("LexicalNode", "node_id", node["node_id"]).fold().coalesce(
+ __.unfold(),
+ __.addV("LexicalNode").property("node_id", node["node_id"]),
+ )
+ for key, val in node.items():
+ if key != "node_id" and val is not None:
+ t = t.property(key, val)
+ t.iterate()
+
+ return len(nodes)
+
+ async def write_edges(self, edges_by_type: dict[str, list[dict[str, Any]]]) -> int:
+ if not self._g or not edges_by_type:
+ return 0
+
+ from gremlin_python.process.graph_traversal import __
+
+ g = self._g
+ total = 0
+ for rel_type, typed_edges in edges_by_type.items():
+ for edge in typed_edges:
+ # Ensure target vertex exists
+ g.V().has("LexicalNode", "node_id", edge["target_id"]).fold().coalesce(
+ __.unfold(),
+ __.addV("LexicalNode")
+ .property("node_id", edge["target_id"])
+ .property("name", edge.get("target_name", ""))
+ .property("node_type", "external")
+ .property("tenant_id", edge.get("tenant_id", ""))
+ .property("repo_id", edge.get("repo_id", "")),
+ ).iterate()
+
+ # Upsert edge
+ g.V().has("LexicalNode", "node_id", edge["source_id"]).as_("src") \
+ .V().has("LexicalNode", "node_id", edge["target_id"]).as_("tgt") \
+ .coalesce(
+ __.select("src").outE(rel_type).where(
+ __.has("edge_id", edge["edge_id"])
+ ),
+ __.select("src").addE(rel_type).to(__.select("tgt"))
+ .property("edge_id", edge["edge_id"]),
+ ) \
+ .property("file", edge.get("file", "")) \
+ .property("line", edge.get("line")) \
+ .iterate()
+
+ total += 1
+
+ return total
+
+ async def delete_file_nodes(self, tenant_id: str, repo_id: str, file_path: str) -> None:
+ if not self._g:
+ return
+ self._g.V().has("LexicalNode", "tenant_id", tenant_id) \
+ .has("repo_id", repo_id) \
+ .has("file", file_path) \
+ .drop().iterate()
+
+ # ── Query operations ─────────────────────────────────────────────────
+
+ async def query_named(
+ self,
+ query_type: str,
+ tenant_id: str,
+ repo_id: str,
+ filter_str: str,
+ limit: int,
+ ) -> list[dict[str, Any]]:
+ if not self._g:
+ return []
+
+ match query_type:
+ case "lexical":
+ return self._query_lexical(tenant_id, repo_id, filter_str, limit)
+ case "referential":
+ return self._query_referential(tenant_id, repo_id, filter_str, limit)
+ case "semantic":
+ return self._query_semantic(tenant_id, repo_id, filter_str, limit)
+ case "dependency":
+ return self._query_dependency(tenant_id, repo_id, filter_str, limit)
+ case _:
+ raise ValueError(
+ f"unknown query_type '{query_type}'; "
+ f"valid: lexical, referential, semantic, dependency, {self.raw_query_language}"
+ )
+
+ def _query_lexical(
+ self, tenant_id: str, repo_id: str, filter_str: str, limit: int
+ ) -> list[dict[str, Any]]:
+ from gremlin_python.process.graph_traversal import __
+ from gremlin_python.process.traversal import TextP
+
+ g = self._g
+ t = g.V().has_label("LexicalNode") \
+ .has("tenant_id", tenant_id) \
+ .has("repo_id", repo_id)
+
+ if filter_str:
+ t = t.or_(
+ __.has("name", TextP.containing(filter_str)),
+ __.has("file", TextP.containing(filter_str)),
+ )
+
+ t = t.order().by("file").by("line_start")
+ results = t.limit(limit).project(
+ "type", "name", "file", "line_start", "line_end", "language", "is_async"
+ ).by(__.values("node_type")) \
+ .by(__.values("name")) \
+ .by(__.values("file")) \
+ .by(__.coalesce(__.values("line_start"), __.constant(None))) \
+ .by(__.coalesce(__.values("line_end"), __.constant(None))) \
+ .by(__.coalesce(__.values("language"), __.constant(None))) \
+ .by(__.coalesce(__.values("is_async"), __.constant(False))) \
+ .toList()
+
+ return results
+
+ def _query_referential(
+ self, tenant_id: str, repo_id: str, filter_str: str, limit: int
+ ) -> list[dict[str, Any]]:
+ from gremlin_python.process.graph_traversal import __
+ from gremlin_python.process.traversal import TextP
+
+ g = self._g
+ edge_labels = ["CALLS", "IMPORTS", "EXTENDS", "GUARDED_BY", "PROTECTED_BY"]
+
+ t = g.V().has_label("LexicalNode") \
+ .has("tenant_id", tenant_id) \
+ .has("repo_id", repo_id)
+
+ if filter_str:
+ t = t.or_(
+ __.has("name", TextP.containing(filter_str)),
+ __.has("file", TextP.containing(filter_str)),
+ )
+
+ results = t.outE(*edge_labels).as_("e") \
+ .inV().as_("tgt") \
+ .select("e").outV().as_("src") \
+ .select("src", "e", "tgt") \
+ .project(
+ "from_name", "from_file", "edge_type",
+ "to_name", "to_file", "to_node_type", "line"
+ ) \
+ .by(__.select("src").values("name")) \
+ .by(__.select("src").values("file")) \
+ .by(__.select("e").label()) \
+ .by(__.select("tgt").values("name")) \
+ .by(__.select("tgt").coalesce(__.values("file"), __.constant(""))) \
+ .by(__.select("tgt").coalesce(__.values("node_type"), __.constant("external"))) \
+ .by(__.select("e").coalesce(__.values("line"), __.constant(None))) \
+ .limit(limit) \
+ .toList()
+
+ return results
+
+ def _query_semantic(
+ self, tenant_id: str, repo_id: str, filter_str: str, limit: int
+ ) -> list[dict[str, Any]]:
+ from gremlin_python.process.graph_traversal import __
+ from gremlin_python.process.traversal import TextP
+
+ g = self._g
+ t = g.V().has_label("LexicalNode") \
+ .has("tenant_id", tenant_id) \
+ .has("repo_id", repo_id)
+
+ if filter_str:
+ t = t.or_(
+ __.has("name", TextP.containing(filter_str)),
+ __.has("file", TextP.containing(filter_str)),
+ )
+
+ results = t.outE("TAINT_FLOW") \
+ .has("sanitized", False).as_("e") \
+ .inV().as_("tgt") \
+ .select("e").outV().as_("src") \
+ .select("src", "e", "tgt") \
+ .project(
+ "source_name", "source_file", "sink_name", "sink_file",
+ "cwe", "hops", "level", "framework"
+ ) \
+ .by(__.select("src").values("name")) \
+ .by(__.select("src").values("file")) \
+ .by(__.select("tgt").values("name")) \
+ .by(__.select("tgt").values("file")) \
+ .by(__.select("e").coalesce(__.values("cwe"), __.constant(None))) \
+ .by(__.select("e").coalesce(__.values("hops"), __.constant(None))) \
+ .by(__.select("e").coalesce(__.values("level"), __.constant(None))) \
+ .by(__.select("e").coalesce(__.values("framework"), __.constant(None))) \
+ .order().by(__.select("e").values("hops")) \
+ .limit(limit) \
+ .toList()
+
+ return results
+
+ def _query_dependency(
+ self, tenant_id: str, repo_id: str, filter_str: str, limit: int
+ ) -> list[dict[str, Any]]:
+ from gremlin_python.process.graph_traversal import __
+ from gremlin_python.process.traversal import TextP
+
+ g = self._g
+ t = g.V().has_label("LexicalNode") \
+ .has("tenant_id", tenant_id) \
+ .has("repo_id", repo_id) \
+ .has("node_type", "file")
+
+ results = t.outE("DEPENDS_ON").inV().as_("pkg") \
+ .select("pkg")
+
+ if filter_str:
+ results = results.has("name", TextP.containing(filter_str))
+
+ results = results.path().by(__.values("file")).by(__.label()).by(
+ __.project("package", "type").by(__.values("name")).by(
+ __.coalesce(__.values("node_type"), __.constant("external"))
+ )
+ ).limit(limit).toList()
+
+ # Flatten path results
+ rows: list[dict[str, Any]] = []
+ for path in results:
+ objects = path.objects if hasattr(path, "objects") else path
+ if len(objects) >= 3:
+ pkg_info = objects[2]
+ rows.append({
+ "package": pkg_info.get("package", ""),
+ "type": pkg_info.get("type", "external"),
+ "referenced_from": objects[0],
+ })
+
+ return rows
+
+ async def query_raw(
+ self,
+ query: str,
+ params: dict[str, Any],
+ ) -> list[dict[str, Any]]:
+ if not self._g:
+ return []
+
+ from gremlin_python.driver.client import Client
+
+ client = Client(self._url, "g")
+ try:
+ result_set = client.submit(query, params)
+ results = result_set.all().result()
+ # Normalise Gremlin results to list of dicts
+ rows: list[dict[str, Any]] = []
+ for item in results:
+ if isinstance(item, dict):
+ rows.append(item)
+ else:
+ rows.append({"result": item})
+ return rows
+ finally:
+ client.close()
+
+ async def count_nodes(self, tenant_id: str, repo_id: str) -> int | None:
+ if not self._g:
+ return None
+ try:
+ result = self._g.V().has_label("LexicalNode") \
+ .has("tenant_id", tenant_id) \
+ .has("repo_id", repo_id) \
+ .count().next()
+ return result
+ except Exception:
+ return None
+
+ async def write_augment_edge(
+ self,
+ edge_type: str,
+ source_id: str,
+ target_id: str,
+ target_name: str,
+ tenant_id: str,
+ repo_id: str,
+ edge_id: str,
+ file: str,
+ line: int | None,
+ confidence: float,
+ source: str,
+ rationale: str,
+ ) -> None:
+ if not self._g:
+ return
+
+ from gremlin_python.process.graph_traversal import __
+
+ g = self._g
+ rel_type = edge_type.upper()
+
+ # Ensure target vertex exists
+ g.V().has("LexicalNode", "node_id", target_id).fold().coalesce(
+ __.unfold(),
+ __.addV("LexicalNode")
+ .property("node_id", target_id)
+ .property("name", target_name)
+ .property("node_type", "external")
+ .property("tenant_id", tenant_id)
+ .property("repo_id", repo_id)
+ .property("confidence", confidence)
+ .property("source", source),
+ ).iterate()
+
+ # Upsert edge
+ t = g.V().has("LexicalNode", "node_id", source_id).as_("src") \
+ .V().has("LexicalNode", "node_id", target_id).as_("tgt") \
+ .coalesce(
+ __.select("src").outE(rel_type).where(
+ __.has("edge_id", edge_id)
+ ),
+ __.select("src").addE(rel_type).to(__.select("tgt"))
+ .property("edge_id", edge_id),
+ ) \
+ .property("file", file) \
+ .property("line", line) \
+ .property("confidence", confidence) \
+ .property("source", source) \
+ .property("rationale", rationale)
+ t.iterate()
+
+ @property
+ def backend_name(self) -> str:
+ return "janusgraph"
+
+ @property
+ def raw_query_language(self) -> str:
+ return "gremlin"
diff --git a/packages/codesteward-graph/src/codesteward/engine/backends/neo4j.py b/packages/codesteward-graph/src/codesteward/engine/backends/neo4j.py
new file mode 100644
index 0000000..b9359cd
--- /dev/null
+++ b/packages/codesteward-graph/src/codesteward/engine/backends/neo4j.py
@@ -0,0 +1,283 @@
+"""Neo4j graph backend implementation."""
+
+import json
+from typing import Any
+
+import structlog
+
+from codesteward.engine.backends.base import GraphBackend
+
+log = structlog.get_logger()
+
+# ---------------------------------------------------------------------------
+# Cypher templates for named query types
+# ---------------------------------------------------------------------------
+
+_CYPHER_TEMPLATES: dict[str, str] = {
+ "lexical": """
+ MATCH (n:LexicalNode {tenant_id: $tenant_id, repo_id: $repo_id})
+ WHERE ($filter = '' OR n.name CONTAINS $filter OR n.file CONTAINS $filter)
+ RETURN n.node_type AS type, n.name AS name, n.file AS file,
+ n.line_start AS line_start, n.line_end AS line_end,
+ n.language AS language, n.is_async AS is_async
+ ORDER BY n.file, n.line_start
+ LIMIT $limit
+ """,
+ "referential": """
+ MATCH (src:LexicalNode {tenant_id: $tenant_id, repo_id: $repo_id})
+ -[r:CALLS|IMPORTS|EXTENDS|GUARDED_BY|PROTECTED_BY]->(tgt)
+ WHERE ($filter = '' OR src.name CONTAINS $filter OR src.file CONTAINS $filter)
+ RETURN src.name AS from_name, src.file AS from_file,
+ type(r) AS edge_type,
+ tgt.name AS to_name, tgt.file AS to_file,
+ tgt.node_type AS to_node_type,
+ r.line AS line
+ ORDER BY src.file, r.line
+ LIMIT $limit
+ """,
+ "semantic": """
+ MATCH (src:LexicalNode {tenant_id: $tenant_id, repo_id: $repo_id})
+ -[r:TAINT_FLOW]->(tgt:LexicalNode)
+ WHERE ($filter = '' OR src.name CONTAINS $filter OR src.file CONTAINS $filter)
+ AND NOT r.sanitized
+ RETURN src.name AS source_name, src.file AS source_file,
+ tgt.name AS sink_name, tgt.file AS sink_file,
+ r.cwe AS cwe, r.hops AS hops,
+ r.level AS level, r.framework AS framework
+ ORDER BY r.hops ASC, src.file ASC
+ LIMIT $limit
+ """,
+ "dependency": """
+ MATCH (src:LexicalNode {tenant_id: $tenant_id, repo_id: $repo_id,
+ node_type: 'file'})
+ -[r:DEPENDS_ON]->(pkg:LexicalNode)
+ WHERE ($filter = '' OR pkg.name CONTAINS $filter)
+ RETURN DISTINCT pkg.name AS package, pkg.node_type AS type,
+ src.file AS referenced_from
+ ORDER BY pkg.name
+ LIMIT $limit
+ """,
+}
+
+
+class Neo4jBackend(GraphBackend):
+ """Neo4j graph backend using the official async driver.
+
+ Args:
+ uri: Neo4j bolt URI.
+ user: Neo4j username.
+ password: Neo4j password.
+ """
+
+ def __init__(self, uri: str = "", user: str = "", password: str = "") -> None:
+ self._driver: Any | None = None
+ if password:
+ try:
+ import neo4j
+ self._driver = neo4j.AsyncGraphDatabase.driver(
+ uri, auth=(user, password)
+ )
+ except Exception as exc:
+ log.error("neo4j_driver_init_failed", error=str(exc))
+
+ def is_connected(self) -> bool:
+ return self._driver is not None
+
+ async def close(self) -> None:
+ if self._driver is not None:
+ await self._driver.close()
+
+ async def write_nodes(self, nodes: list[dict[str, Any]]) -> int:
+ if not self._driver or not nodes:
+ return 0
+ cypher = """
+ UNWIND $nodes AS n
+ MERGE (node:LexicalNode {node_id: n.node_id})
+ SET node += n
+ """
+ async with self._driver.session() as session:
+ await session.run(cypher, nodes=nodes)
+ return len(nodes)
+
+ async def write_edges(self, edges_by_type: dict[str, list[dict[str, Any]]]) -> int:
+ if not self._driver or not edges_by_type:
+ return 0
+ total = 0
+ async with self._driver.session() as session:
+ for rel_type, typed_edges in edges_by_type.items():
+ cypher = f"""
+ UNWIND $edges AS e
+ MATCH (src:LexicalNode {{node_id: e.source_id}})
+ MERGE (tgt:LexicalNode {{node_id: e.target_id}})
+ ON CREATE SET tgt.name = e.target_name, tgt.node_type = 'external',
+ tgt.tenant_id = e.tenant_id, tgt.repo_id = e.repo_id
+ MERGE (src)-[r:{rel_type} {{edge_id: e.edge_id}}]->(tgt)
+ SET r.file = e.file, r.line = e.line
+ """
+ await session.run(cypher, edges=typed_edges)
+ total += len(typed_edges)
+ return total
+
+ async def delete_file_nodes(self, tenant_id: str, repo_id: str, file_path: str) -> None:
+ if not self._driver:
+ return
+ cypher = """
+ MATCH (n:LexicalNode {tenant_id: $tenant_id, repo_id: $repo_id, file: $file})
+ DETACH DELETE n
+ """
+ async with self._driver.session() as session:
+ await session.run(cypher, tenant_id=tenant_id, repo_id=repo_id, file=file_path)
+
+ async def query_named(
+ self,
+ query_type: str,
+ tenant_id: str,
+ repo_id: str,
+ filter_str: str,
+ limit: int,
+ ) -> list[dict[str, Any]]:
+ template = _CYPHER_TEMPLATES.get(query_type)
+ if template is None:
+ raise ValueError(
+ f"unknown query_type '{query_type}'; "
+ f"valid: {list(_CYPHER_TEMPLATES) + [self.raw_query_language]}"
+ )
+ if not self._driver:
+ return []
+ async with self._driver.session() as session:
+ result = await session.run(
+ template,
+ tenant_id=tenant_id,
+ repo_id=repo_id,
+ filter=filter_str,
+ limit=limit,
+ )
+ return [dict(record) for record in await result.data()]
+
+ async def query_raw(
+ self,
+ query: str,
+ params: dict[str, Any],
+ ) -> list[dict[str, Any]]:
+ if not self._driver:
+ return []
+ async with self._driver.session() as session:
+ result = await session.run(query, **params)
+ return [dict(record) for record in await result.data()]
+
+ async def count_nodes(self, tenant_id: str, repo_id: str) -> int | None:
+ if not self._driver:
+ return None
+ try:
+ async with self._driver.session() as session:
+ result = await session.run(
+ """
+ MATCH (n:LexicalNode {tenant_id: $tenant_id, repo_id: $repo_id})
+ RETURN count(n) AS node_count
+ """,
+ tenant_id=tenant_id,
+ repo_id=repo_id,
+ )
+ record = await result.single()
+ return record["node_count"] if record else None
+ except Exception:
+ return None
+
+ async def write_augment_edge(
+ self,
+ edge_type: str,
+ source_id: str,
+ target_id: str,
+ target_name: str,
+ tenant_id: str,
+ repo_id: str,
+ edge_id: str,
+ file: str,
+ line: int | None,
+ confidence: float,
+ source: str,
+ rationale: str,
+ ) -> None:
+ if not self._driver:
+ return
+ rel_type = edge_type.upper()
+ cypher = f"""
+ MATCH (src:LexicalNode {{node_id: $source_id}})
+ MERGE (tgt:LexicalNode {{node_id: $target_id}})
+ ON CREATE SET tgt.name = $target_name, tgt.node_type = 'external',
+ tgt.tenant_id = $tenant_id, tgt.repo_id = $repo_id,
+ tgt.confidence = $confidence, tgt.source = $source
+ MERGE (src)-[r:{rel_type} {{edge_id: $edge_id}}]->(tgt)
+ SET r.file = $file, r.line = $line,
+ r.confidence = $confidence, r.source = $source,
+ r.rationale = $rationale
+ """
+ async with self._driver.session() as session:
+ await session.run(
+ cypher,
+ source_id=source_id,
+ target_id=target_id,
+ target_name=target_name,
+ tenant_id=tenant_id,
+ repo_id=repo_id,
+ edge_id=edge_id,
+ file=file,
+ line=line,
+ confidence=confidence,
+ source=source,
+ rationale=rationale,
+ )
+
+ @property
+ def backend_name(self) -> str:
+ return "neo4j"
+
+ @property
+ def raw_query_language(self) -> str:
+ return "cypher"
+
+
+def serialize_node_props(node) -> dict[str, Any]:
+ """Convert a LexicalNode to a dict suitable for Neo4j MERGE.
+
+ Args:
+ node: A LexicalNode instance.
+
+ Returns:
+ Property dict with JSON-serialized metadata.
+ """
+ return {
+ "node_id": node.node_id,
+ "node_type": node.node_type,
+ "name": node.name,
+ "file": node.file,
+ "line_start": node.line_start,
+ "line_end": node.line_end,
+ "language": node.language,
+ "tenant_id": node.tenant_id,
+ "repo_id": node.repo_id,
+ "exported": node.exported,
+ "is_async": node.is_async,
+ "metadata": json.dumps(node.metadata) if node.metadata else "{}",
+ }
+
+
+def serialize_edge_props(edge) -> dict[str, Any]:
+ """Convert a GraphEdge to a dict suitable for Neo4j MERGE.
+
+ Args:
+ edge: A GraphEdge instance.
+
+ Returns:
+ Property dict for edge upsert.
+ """
+ return {
+ "source_id": edge.source_id,
+ "target_id": edge.target_id,
+ "target_name": edge.target_name,
+ "tenant_id": edge.tenant_id,
+ "repo_id": edge.repo_id,
+ "edge_id": edge.edge_id,
+ "file": edge.file,
+ "line": edge.line,
+ }
diff --git a/packages/codesteward-graph/src/codesteward/engine/graph_builder.py b/packages/codesteward-graph/src/codesteward/engine/graph_builder.py
index ad7574e..a6dd03e 100644
--- a/packages/codesteward-graph/src/codesteward/engine/graph_builder.py
+++ b/packages/codesteward-graph/src/codesteward/engine/graph_builder.py
@@ -210,6 +210,11 @@ class Neo4jWriter:
When ``driver`` is None (tests, local dev without Neo4j), all write
operations are no-ops and a warning is logged.
+ .. deprecated::
+ Use :class:`~codesteward.engine.backends.neo4j.Neo4jBackend` via
+ :class:`GraphWriter` instead. This class is retained for backward
+ compatibility.
+
Cypher patterns used:
- Nodes: ``MERGE (n:LexicalNode {node_id: $id}) SET n += $props``
- Edges: ``MERGE (a)-[r:IMPORTS]->(b)`` style relationships via MATCH + MERGE
@@ -332,6 +337,99 @@ async def delete_file_nodes(self, tenant_id: str, repo_id: str, file_path: str)
await session.run(cypher, tenant_id=tenant_id, repo_id=repo_id, file=file_path)
+class GraphWriter:
+ """Backend-agnostic graph writer that delegates to a GraphBackend.
+
+ When no backend is provided, all write operations are no-ops (stub mode).
+ """
+
+ def __init__(self, backend: Any | None = None) -> None:
+ """Initialise with an optional graph backend.
+
+ Args:
+ backend: A ``GraphBackend`` instance, or None for stub mode.
+ """
+ from codesteward.engine.backends.base import GraphBackend as _GB
+ self._backend: _GB | None = backend
+ if backend is None:
+ log.warning("graph_writer_stub_mode", reason="No graph backend provided")
+
+ def is_connected(self) -> bool:
+ """Return True if a graph backend is configured and connected."""
+ return self._backend is not None and self._backend.is_connected()
+
+ async def write_nodes(self, nodes: list[LexicalNode]) -> int:
+ """Upsert lexical nodes via the configured backend.
+
+ Args:
+ nodes: Nodes to write.
+
+ Returns:
+ Number of nodes written (0 in stub mode).
+ """
+ if not self._backend or not nodes:
+ return 0
+
+ props = [
+ {
+ "node_id": node.node_id,
+ "node_type": node.node_type,
+ "name": node.name,
+ "file": node.file,
+ "line_start": node.line_start,
+ "line_end": node.line_end,
+ "language": node.language,
+ "tenant_id": node.tenant_id,
+ "repo_id": node.repo_id,
+ "exported": node.exported,
+ "is_async": node.is_async,
+ "metadata": json.dumps(node.metadata) if node.metadata else "{}",
+ }
+ for node in nodes
+ ]
+ return await self._backend.write_nodes(props)
+
+ async def write_edges(self, edges: list[GraphEdge]) -> int:
+ """Upsert graph edges via the configured backend.
+
+ Args:
+ edges: Edges to write.
+
+ Returns:
+ Number of edges written (0 in stub mode).
+ """
+ if not self._backend or not edges:
+ return 0
+
+ by_type: dict[str, list[dict[str, Any]]] = {}
+ for edge in edges:
+ props = {
+ "source_id": edge.source_id,
+ "target_id": edge.target_id,
+ "target_name": edge.target_name,
+ "tenant_id": edge.tenant_id,
+ "repo_id": edge.repo_id,
+ "edge_id": edge.edge_id,
+ "file": edge.file,
+ "line": edge.line,
+ }
+ by_type.setdefault(edge.edge_type.upper(), []).append(props)
+
+ return await self._backend.write_edges(by_type)
+
+ async def delete_file_nodes(self, tenant_id: str, repo_id: str, file_path: str) -> None:
+ """Delete all nodes and edges scoped to a specific file.
+
+ Args:
+ tenant_id: Tenant namespace.
+ repo_id: Repository identifier.
+ file_path: Repo-relative path of the file to remove.
+ """
+ if not self._backend:
+ return
+ await self._backend.delete_file_nodes(tenant_id, repo_id, file_path)
+
+
# ===========================================================================
# GraphBuilder — public interface
# ===========================================================================
@@ -361,8 +459,12 @@ class GraphBuilder:
)
"""
- def __init__(self, neo4j_driver: Any | None = None) -> None:
- """Initialise with optional Neo4j driver.
+ def __init__(
+ self,
+ neo4j_driver: Any | None = None,
+ backend: Any | None = None,
+ ) -> None:
+ """Initialise with optional graph backend or Neo4j driver.
The graph builder uses the parsers registry (``get_parser()``) to
dispatch to the tree-sitter AST parser for each language. COBOL is
@@ -370,11 +472,17 @@ def __init__(self, neo4j_driver: Any | None = None) -> None:
tree-sitter grammar is available for it.
Args:
- neo4j_driver: A ``neo4j.AsyncDriver`` instance. If None, the
- builder parses the codebase but skips all database writes.
+ neo4j_driver: A ``neo4j.AsyncDriver`` instance (legacy). If both
+ ``neo4j_driver`` and ``backend`` are None, the builder parses
+ the codebase but skips all database writes.
+ backend: A ``GraphBackend`` instance. Takes precedence over
+ ``neo4j_driver`` if both are provided.
"""
self._pkg_parser = PackageJsonParser()
- self._writer = Neo4jWriter(neo4j_driver)
+ if backend is not None:
+ self._writer = GraphWriter(backend)
+ else:
+ self._writer = Neo4jWriter(neo4j_driver)
def _parse_source(self, file_path: str, content: str, language: str) -> ParseResult:
"""Parse a source file using the language registry.
diff --git a/packages/codesteward-mcp/README.md b/packages/codesteward-mcp/README.md
index 9d5761d..237fc37 100644
--- a/packages/codesteward-mcp/README.md
+++ b/packages/codesteward-mcp/README.md
@@ -26,6 +26,9 @@ uv pip install "codesteward-mcp[graph]"
# All 14 languages
uv pip install "codesteward-mcp[graph-all]"
+
+# JanusGraph backend (alternative to Neo4j)
+uv pip install "codesteward-mcp[graph-all,janusgraph]"
```
## Tools
@@ -33,7 +36,7 @@ uv pip install "codesteward-mcp[graph-all]"
| Tool | Description |
| ---- | ----------- |
| `graph_rebuild` | Parse a repository into the structural graph |
-| `codebase_graph_query` | Query the graph (`lexical`, `referential`, `semantic`, `dependency`, raw Cypher) |
+| `codebase_graph_query` | Query the graph (`lexical`, `referential`, `semantic`, `dependency`, raw Cypher/Gremlin) |
| `graph_augment` | Add agent-inferred relationships to the graph |
| `graph_status` | Return graph metadata (node/edge counts, last build time) |
diff --git a/packages/codesteward-mcp/pyproject.toml b/packages/codesteward-mcp/pyproject.toml
index c631cd4..5ecb027 100644
--- a/packages/codesteward-mcp/pyproject.toml
+++ b/packages/codesteward-mcp/pyproject.toml
@@ -41,8 +41,10 @@ Changelog = "https://github.com/bitkaio/codesteward-mcp/blob/main/CHANGELOG.md"
[project.optional-dependencies]
# Re-export language extras so users can do: uv add codesteward-mcp[graph-all]
-graph = ["codesteward-graph[graph]"]
-graph-all = ["codesteward-graph[graph-all]"]
+graph = ["codesteward-graph[graph]"]
+graph-all = ["codesteward-graph[graph-all]"]
+# JanusGraph backend (alternative to Neo4j)
+janusgraph = ["codesteward-graph[janusgraph]"]
[tool.uv.sources]
codesteward-graph = { workspace = true }
diff --git a/packages/codesteward-mcp/src/codesteward/mcp/config.py b/packages/codesteward-mcp/src/codesteward/mcp/config.py
index 3c52605..701341f 100644
--- a/packages/codesteward-mcp/src/codesteward/mcp/config.py
+++ b/packages/codesteward-mcp/src/codesteward/mcp/config.py
@@ -2,7 +2,7 @@
Loaded from environment variables and/or a YAML config file. All settings
have sensible defaults so the server works out of the box without any config
-(Neo4j optional — parse-only stub mode when not configured).
+(graph backend optional — parse-only stub mode when not configured).
"""
@@ -32,11 +32,25 @@ class McpConfig(BaseSettings):
host: str = Field("0.0.0.0", description="HTTP server bind host")
port: int = Field(3000, description="HTTP server port")
+ # ── Graph backend ─────────────────────────────────────────────────────
+ graph_backend: str = Field(
+ "neo4j",
+ alias="GRAPH_BACKEND",
+ description="Graph database backend: 'neo4j' or 'janusgraph'",
+ )
+
# ── Neo4j (optional) ─────────────────────────────────────────────────────
neo4j_uri: str = Field("bolt://localhost:7687", alias="NEO4J_URI")
neo4j_user: str = Field("neo4j", alias="NEO4J_USER")
neo4j_password: str = Field("", alias="NEO4J_PASSWORD")
+ # ── JanusGraph (optional) ─────────────────────────────────────────────────
+ janusgraph_url: str = Field(
+ "ws://localhost:8182/gremlin",
+ alias="JANUSGRAPH_URL",
+ description="Gremlin Server WebSocket URL for JanusGraph",
+ )
+
# ── Default tenant / repo for single-user local deployments ──────────────
default_tenant_id: str = Field("local", description="Default tenant namespace")
default_repo_id: str = Field("", description="Default repo ID (set to the repo name)")
@@ -45,7 +59,7 @@ class McpConfig(BaseSettings):
description=(
"Default filesystem path to the repository. Used when graph_rebuild "
"is called without an explicit repo_path argument. In the Docker setup "
- "this matches the mount point defined in docker-compose.yml."
+ "this matches the mount point defined in the docker-compose files."
),
)
@@ -65,6 +79,18 @@ def neo4j_available(self) -> bool:
"""True when Neo4j credentials are configured."""
return bool(self.neo4j_password)
+ @property
+ def janusgraph_available(self) -> bool:
+ """True when JanusGraph is the selected backend."""
+ return self.graph_backend == "janusgraph"
+
+ @property
+ def graph_available(self) -> bool:
+ """True when any graph backend is configured and usable."""
+ if self.graph_backend == "janusgraph":
+ return True # JanusGraph has no mandatory auth
+ return self.neo4j_available
+
def load_config(config_file: str | None = None) -> McpConfig:
"""Load MCP server config, optionally merging a YAML file.
diff --git a/packages/codesteward-mcp/src/codesteward/mcp/server.py b/packages/codesteward-mcp/src/codesteward/mcp/server.py
index 4241f87..f643567 100644
--- a/packages/codesteward-mcp/src/codesteward/mcp/server.py
+++ b/packages/codesteward-mcp/src/codesteward/mcp/server.py
@@ -3,11 +3,12 @@
Exposes four tools over Model Context Protocol:
``graph_rebuild``
- Parse a repository and write the structural graph to Neo4j (or stub mode
- without Neo4j).
+ Parse a repository and write the structural graph to Neo4j or JanusGraph
+ (or stub mode without a graph backend).
``codebase_graph_query``
- Query the graph via named templates or raw Cypher.
+ Query the graph via named templates or raw query passthrough (Cypher for
+ Neo4j, Gremlin for JanusGraph).
``graph_augment``
Add agent-inferred relationships (confidence < 1.0) to the graph.
@@ -84,7 +85,7 @@ def build_mcp_server(config_file: str | None = None) -> tuple[FastMCP, Any]:
"code graph and query it for code intelligence.\n\n"
"TYPICAL WORKFLOW\n"
"1. Call graph_status to check whether a graph already exists for the "
- "repository. If neo4j_connected is false or last_build is null, proceed "
+ "repository. If backend_connected is false or last_build is null, proceed "
"to step 2.\n"
"2. Call graph_rebuild. In the standard Docker setup all arguments "
"can be omitted — the server already knows where the repository is "
@@ -97,7 +98,8 @@ def build_mcp_server(config_file: str | None = None) -> tuple[FastMCP, Any]:
" - semantic → find taint-flow paths (source → sink). Requires "
"codesteward-taint to have been run first. Returns empty until then.\n"
" - dependency → list external package dependencies\n"
- " - cypher → raw Cypher for anything not covered by the above\n"
+ " - cypher → raw Cypher (Neo4j backend only)\n"
+ " - gremlin → raw Gremlin (JanusGraph backend only)\n"
"4. Optionally call taint_analysis to trace how untrusted input propagates "
"to dangerous sinks (SQL, shell, file I/O, outbound HTTP). This tool is "
"only present when the codesteward-taint binary is installed. After it "
@@ -110,7 +112,10 @@ def build_mcp_server(config_file: str | None = None) -> tuple[FastMCP, Any]:
"IMPORTANT: graph_rebuild must complete successfully before "
"codebase_graph_query will return non-empty results. An empty result "
"from a query does not mean the code has no symbols — it may mean the "
- "graph has not been built yet."
+ "graph has not been built yet.\n\n"
+ "BACKENDS: The server supports Neo4j (default) and JanusGraph "
+ "(set GRAPH_BACKEND=janusgraph). Named query templates work identically "
+ "on both. Raw query passthrough uses the backend's native language."
),
)
@@ -125,7 +130,8 @@ async def graph_rebuild(
) -> str:
"""Build or incrementally update the codebase graph.
- Parse the repository and write LexicalNode / edge data to Neo4j.
+ Parse the repository and write LexicalNode / edge data to the
+ configured graph backend (Neo4j or JanusGraph).
Must be called before ``codebase_graph_query`` will return results.
``repo_path`` is an absolute path on the **server's** filesystem (not
@@ -133,8 +139,8 @@ async def graph_rebuild(
(``/repos/project`` in the Docker setup — the repository mounted via
the ``REPO_PATH`` environment variable in docker-compose).
- When Neo4j is not configured the parser still runs (stub mode) and
- returns node/edge counts, but nothing is persisted — queries will
+ When no graph backend is configured the parser still runs (stub mode)
+ and returns node/edge counts, but nothing is persisted — queries will
return empty results.
Full rebuild vs incremental:
@@ -156,7 +162,8 @@ async def graph_rebuild(
Returns:
YAML summary: mode (full|incremental), files_parsed, nodes (dict
- with total), edges (dict with total), duration_ms, neo4j_connected.
+ with total), edges (dict with total), duration_ms,
+ backend_connected, graph_backend.
"""
return str(await tool_graph_rebuild(
repo_path=repo_path or cfg.default_repo_path,
@@ -193,18 +200,19 @@ async def codebase_graph_query(
framework. Requires ``taint_analysis`` to have run first.
- ``dependency`` — list external package dependencies per file.
Results: package, type, referenced_from.
- - ``cypher`` — raw Cypher passthrough. The ``query`` parameter must
- be a full Cypher statement. Use ``$tenant_id``, ``$repo_id``, and
- ``$limit`` as parameters. Use this when the named types do not cover
- your query.
+ - ``cypher`` — raw Cypher passthrough (Neo4j backend only). The
+ ``query`` parameter must be a full Cypher statement. Use
+ ``$tenant_id``, ``$repo_id``, and ``$limit`` as parameters.
+ - ``gremlin`` — raw Gremlin passthrough (JanusGraph backend only).
+ The ``query`` parameter must be a Gremlin traversal string.
The ``query`` parameter is a substring filter on name or file path for
- named query types (pass ``""`` for no filter). For ``cypher`` it is
- the full Cypher statement.
+ named query types (pass ``""`` for no filter). For ``cypher`` /
+ ``gremlin`` it is the full native query statement.
Args:
query_type: One of ``lexical``, ``referential``, ``semantic``,
- ``dependency``, or ``cypher``.
+ ``dependency``, ``cypher`` (Neo4j), or ``gremlin`` (JanusGraph).
query: Substring filter or raw Cypher statement (see above).
tenant_id: Tenant namespace. Defaults to server default.
repo_id: Repository identifier. Defaults to server default.
@@ -287,7 +295,7 @@ async def graph_status(
"""Return metadata about the current graph state.
Call this first before querying. If ``last_build`` is null or
- ``neo4j_connected`` is false, call ``graph_rebuild`` before attempting
+ ``backend_connected`` is false, call ``graph_rebuild`` before attempting
``codebase_graph_query``.
This call is cheap — it reads a local workspace YAML file and does at
@@ -298,9 +306,10 @@ async def graph_status(
repo_id: Repository identifier. Defaults to server default.
Returns:
- YAML dict with keys: ``neo4j_connected`` (bool), ``last_build``
- (ISO timestamp or null), ``nodes`` (dict with ``total`` count or
- null), ``edges`` (dict with ``total`` count or null).
+ YAML dict with keys: ``backend_connected`` (bool),
+ ``graph_backend`` (str), ``last_build`` (ISO timestamp or null),
+ ``nodes`` (dict with ``total`` count or null), ``edges`` (dict
+ with ``total`` count or null).
"""
return str(await tool_graph_status(
tenant_id=tenant_id or cfg.default_tenant_id,
diff --git a/packages/codesteward-mcp/src/codesteward/mcp/tools/graph.py b/packages/codesteward-mcp/src/codesteward/mcp/tools/graph.py
index cda076b..e6abd08 100644
--- a/packages/codesteward-mcp/src/codesteward/mcp/tools/graph.py
+++ b/packages/codesteward-mcp/src/codesteward/mcp/tools/graph.py
@@ -4,12 +4,13 @@
``graph_rebuild``
Parse a repository (or a set of changed files) and write the structural
- graph to Neo4j. Works in stub mode (parse-only, no Neo4j) when Neo4j
- credentials are not configured.
+ graph to the configured backend (Neo4j or JanusGraph). Works in stub
+ mode (parse-only) when no backend is configured.
``codebase_graph_query``
Query the graph via named templates (lexical / referential / semantic /
- dependency) or raw Cypher passthrough. Returns YAML.
+ dependency) or raw query passthrough (Cypher for Neo4j, Gremlin for
+ JanusGraph). Returns YAML.
``graph_augment``
Add agent-inferred relationships (confidence < 1.0) to the graph.
@@ -17,7 +18,7 @@
``graph_status``
Return metadata about the current graph: node/edge counts, last build
- timestamp, Neo4j connectivity.
+ timestamp, backend connectivity.
"""
@@ -27,61 +28,13 @@
import structlog
import yaml
+from codesteward.engine.backends import get_backend
+from codesteward.engine.backends.base import GraphBackend
from codesteward.engine.graph_builder import GraphBuilder
from codesteward.mcp.config import McpConfig
log = structlog.get_logger()
-# ---------------------------------------------------------------------------
-# Cypher templates for named query types
-# ---------------------------------------------------------------------------
-
-_CYPHER_TEMPLATES: dict[str, str] = {
- "lexical": """
- MATCH (n:LexicalNode {tenant_id: $tenant_id, repo_id: $repo_id})
- WHERE ($filter = '' OR n.name CONTAINS $filter OR n.file CONTAINS $filter)
- RETURN n.node_type AS type, n.name AS name, n.file AS file,
- n.line_start AS line_start, n.line_end AS line_end,
- n.language AS language, n.is_async AS is_async
- ORDER BY n.file, n.line_start
- LIMIT $limit
- """,
- "referential": """
- MATCH (src:LexicalNode {tenant_id: $tenant_id, repo_id: $repo_id})
- -[r:CALLS|IMPORTS|EXTENDS|GUARDED_BY|PROTECTED_BY]->(tgt)
- WHERE ($filter = '' OR src.name CONTAINS $filter OR src.file CONTAINS $filter)
- RETURN src.name AS from_name, src.file AS from_file,
- type(r) AS edge_type,
- tgt.name AS to_name, tgt.file AS to_file,
- tgt.node_type AS to_node_type,
- r.line AS line
- ORDER BY src.file, r.line
- LIMIT $limit
- """,
- "semantic": """
- MATCH (src:LexicalNode {tenant_id: $tenant_id, repo_id: $repo_id})
- -[r:TAINT_FLOW]->(tgt:LexicalNode)
- WHERE ($filter = '' OR src.name CONTAINS $filter OR src.file CONTAINS $filter)
- AND NOT r.sanitized
- RETURN src.name AS source_name, src.file AS source_file,
- tgt.name AS sink_name, tgt.file AS sink_file,
- r.cwe AS cwe, r.hops AS hops,
- r.level AS level, r.framework AS framework
- ORDER BY r.hops ASC, src.file ASC
- LIMIT $limit
- """,
- "dependency": """
- MATCH (src:LexicalNode {tenant_id: $tenant_id, repo_id: $repo_id,
- node_type: 'file'})
- -[r:DEPENDS_ON]->(pkg:LexicalNode)
- WHERE ($filter = '' OR pkg.name CONTAINS $filter)
- RETURN DISTINCT pkg.name AS package, pkg.node_type AS type,
- src.file AS referenced_from
- ORDER BY pkg.name
- LIMIT $limit
- """,
-}
-
_ALLOWED_EDGE_TYPES = frozenset({
"calls", "guarded_by", "protected_by", "taint_flow",
"type_equivalent", "migration_target", "audit_sink",
@@ -90,20 +43,38 @@
# ---------------------------------------------------------------------------
-# Neo4j driver helpers
+# Backend factory
# ---------------------------------------------------------------------------
-def _make_async_driver(cfg: McpConfig) -> Any | None:
- """Create an async Neo4j driver from config, or None if not configured."""
+def _make_backend(cfg: McpConfig) -> GraphBackend | None:
+ """Create a graph backend from config, or None if not configured."""
+ if cfg.graph_backend == "janusgraph":
+ try:
+ backend = get_backend("janusgraph", url=cfg.janusgraph_url)
+ if not backend.is_connected():
+ log.warning("janusgraph_not_connected", url=cfg.janusgraph_url)
+ return None
+ return backend
+ except Exception as exc:
+ log.error("janusgraph_backend_init_failed", error=str(exc))
+ return None
+
+ # Default: Neo4j
if not cfg.neo4j_available:
return None
try:
- import neo4j
- return neo4j.AsyncGraphDatabase.driver(
- cfg.neo4j_uri, auth=(cfg.neo4j_user, cfg.neo4j_password)
+ backend = get_backend(
+ "neo4j",
+ uri=cfg.neo4j_uri,
+ user=cfg.neo4j_user,
+ password=cfg.neo4j_password,
)
+ if not backend.is_connected():
+ log.warning("neo4j_not_connected", uri=cfg.neo4j_uri)
+ return None
+ return backend
except Exception as exc:
- log.error("neo4j_driver_init_failed", error=str(exc))
+ log.error("neo4j_backend_init_failed", error=str(exc))
return None
@@ -122,14 +93,14 @@ async def tool_graph_rebuild(
Args:
repo_path: Absolute path to the cloned repository on disk.
- tenant_id: Tenant namespace for Neo4j isolation.
+ tenant_id: Tenant namespace for graph isolation.
repo_id: Repository identifier — must be stable across rebuilds.
changed_files: Repo-relative paths to re-parse for incremental mode.
Pass ``None`` for a full rebuild.
cfg: Server configuration.
Returns:
- YAML summary with node/edge counts, duration, and Neo4j status.
+ YAML summary with node/edge counts, duration, and backend status.
"""
mode = "incremental" if changed_files is not None else "full"
log.info(
@@ -140,11 +111,11 @@ async def tool_graph_rebuild(
mode=mode,
)
- driver = _make_async_driver(cfg)
+ backend = _make_backend(cfg)
t0 = time.monotonic()
try:
- builder = GraphBuilder(neo4j_driver=driver)
+ builder = GraphBuilder(backend=backend)
summary = await builder.build_graph(
repo_path=repo_path,
tenant_id=tenant_id,
@@ -159,12 +130,14 @@ async def tool_graph_rebuild(
default_flow_style=False,
))
finally:
- if driver is not None:
- await driver.close()
+ if backend is not None:
+ await backend.close()
summary["mode"] = mode
summary["duration_ms"] = round((time.monotonic() - t0) * 1000)
- summary["neo4j_connected"] = driver is not None
+ summary["neo4j_connected"] = backend is not None and cfg.graph_backend == "neo4j"
+ summary["graph_backend"] = cfg.graph_backend
+ summary["backend_connected"] = backend is not None
# Persist lightweight metadata to workspace if base dir exists
workspace = Path(cfg.workspace_base) / tenant_id / repo_id
@@ -199,22 +172,27 @@ async def tool_codebase_graph_query(
Args:
query_type: One of ``lexical``, ``referential``, ``semantic``,
- ``dependency``, or ``cypher`` (raw passthrough).
- query: Filter substring or raw Cypher statement.
+ ``dependency``, or the backend's raw query language
+ (``cypher`` for Neo4j, ``gremlin`` for JanusGraph).
+ query: Filter substring or raw query statement.
tenant_id: Tenant namespace.
repo_id: Repository identifier.
limit: Maximum rows to return.
cfg: Server configuration.
Returns:
- YAML-formatted results with a ``stub`` key when Neo4j is unavailable.
+ YAML-formatted results with a ``stub`` key when no backend is available.
"""
- driver = _make_async_driver(cfg)
+ backend = _make_backend(cfg)
+ raw_lang = backend.raw_query_language if backend else "cypher"
- if driver is None:
+ if backend is None:
return str(yaml.safe_dump({
"stub": True,
- "reason": "Neo4j not configured — set NEO4J_URI, NEO4J_USER, NEO4J_PASSWORD",
+ "reason": (
+ "Graph backend not configured — set NEO4J_PASSWORD (for Neo4j) "
+ "or GRAPH_BACKEND=janusgraph (for JanusGraph)"
+ ),
"query_type": query_type,
"filter": query,
"tenant_id": tenant_id,
@@ -223,30 +201,40 @@ async def tool_codebase_graph_query(
"results": [],
}, default_flow_style=False))
- # Build Cypher + params
- if query_type == "cypher":
- cypher = query
- params: dict[str, Any] = {
- "tenant_id": tenant_id, "repo_id": repo_id, "limit": limit
- }
- else:
- template = _CYPHER_TEMPLATES.get(query_type)
- if template is None:
- await driver.close()
- return str(yaml.safe_dump({
- "error": f"unknown query_type '{query_type}'",
- "valid_types": list(_CYPHER_TEMPLATES) + ["cypher"],
- }, default_flow_style=False))
- cypher = template
- params = {
- "tenant_id": tenant_id, "repo_id": repo_id,
- "filter": query, "limit": limit,
- }
-
try:
- async with driver.session() as session:
- result = await session.run(cypher, **params)
- records = [dict(record) for record in await result.data()]
+ # Raw query passthrough (cypher or gremlin)
+ if query_type in ("cypher", "gremlin"):
+ if query_type != raw_lang:
+ await backend.close()
+ return str(yaml.safe_dump({
+ "error": (
+ f"Raw query language mismatch: got '{query_type}' but "
+ f"active backend uses '{raw_lang}'"
+ ),
+ "active_backend": backend.backend_name,
+ "expected_query_type": raw_lang,
+ }, default_flow_style=False))
+
+ params: dict[str, Any] = {
+ "tenant_id": tenant_id, "repo_id": repo_id, "limit": limit
+ }
+ records = await backend.query_raw(query, params)
+ else:
+ # Named query template
+ try:
+ records = await backend.query_named(
+ query_type=query_type,
+ tenant_id=tenant_id,
+ repo_id=repo_id,
+ filter_str=query,
+ limit=limit,
+ )
+ except ValueError as exc:
+ await backend.close()
+ return str(yaml.safe_dump({
+ "error": str(exc),
+ "valid_types": ["lexical", "referential", "semantic", "dependency", raw_lang],
+ }, default_flow_style=False))
output: dict[str, Any] = {
"query_type": query_type,
@@ -265,7 +253,7 @@ async def tool_codebase_graph_query(
default_flow_style=False,
))
finally:
- await driver.close()
+ await backend.close()
async def tool_graph_augment(
@@ -291,7 +279,7 @@ async def tool_graph_augment(
"""
from codesteward.engine.graph_builder import GraphEdge
- driver = _make_async_driver(cfg)
+ backend = _make_backend(cfg)
source_tag = f"agent:{agent_id}"
written: list[dict[str, Any]] = []
skipped: list[dict[str, Any]] = []
@@ -336,37 +324,24 @@ async def tool_graph_augment(
source=source_tag,
)
- if driver is not None:
+ if backend is not None:
try:
- rel_type = edge_type.upper()
- cypher = f"""
- MATCH (src:LexicalNode {{node_id: $source_id}})
- MERGE (tgt:LexicalNode {{node_id: $target_id}})
- ON CREATE SET tgt.name = $target_name, tgt.node_type = 'external',
- tgt.tenant_id = $tenant_id, tgt.repo_id = $repo_id,
- tgt.confidence = $confidence, tgt.source = $source
- MERGE (src)-[r:{rel_type} {{edge_id: $edge_id}}]->(tgt)
- SET r.file = $file, r.line = $line,
- r.confidence = $confidence, r.source = $source,
- r.rationale = $rationale
- """
- async with driver.session() as session:
- await session.run(
- cypher,
- source_id=edge.source_id,
- target_id=edge.target_id,
- target_name=edge.target_name,
- tenant_id=edge.tenant_id,
- repo_id=edge.repo_id,
- edge_id=edge.edge_id,
- file=edge.file or "",
- line=edge.line,
- confidence=edge.confidence,
- source=edge.source,
- rationale=rationale,
- )
+ await backend.write_augment_edge(
+ edge_type=edge_type,
+ source_id=edge.source_id,
+ target_id=edge.target_id,
+ target_name=edge.target_name,
+ tenant_id=edge.tenant_id,
+ repo_id=edge.repo_id,
+ edge_id=edge.edge_id,
+ file=edge.file or "",
+ line=edge.line,
+ confidence=edge.confidence,
+ source=edge.source,
+ rationale=rationale,
+ )
except Exception as exc:
- skipped.append({"item": item, "reason": f"neo4j write failed: {exc}"})
+ skipped.append({"item": item, "reason": f"backend write failed: {exc}"})
continue
written.append({
@@ -377,8 +352,8 @@ async def tool_graph_augment(
"confidence": confidence,
})
- if driver is not None:
- await driver.close()
+ if backend is not None:
+ await backend.close()
return str(yaml.safe_dump({
"status": "ok" if not skipped else "partial",
@@ -387,7 +362,9 @@ async def tool_graph_augment(
"skipped": len(skipped),
"edges": written,
"skip_details": skipped,
- "neo4j_connected": driver is not None,
+ "neo4j_connected": backend is not None and cfg.graph_backend == "neo4j",
+ "graph_backend": cfg.graph_backend,
+ "backend_connected": backend is not None,
}, default_flow_style=False, sort_keys=True))
@@ -398,7 +375,7 @@ async def tool_graph_status(
) -> str:
"""Return metadata about the current graph state.
- Checks Neo4j connectivity, reads workspace build metadata, and returns
+ Checks backend connectivity, reads workspace build metadata, and returns
node/edge counts plus last build timestamp.
Args:
@@ -407,12 +384,14 @@ async def tool_graph_status(
cfg: Server configuration.
Returns:
- YAML dict with ``neo4j_connected``, ``last_build``, ``nodes``, ``edges``.
+ YAML dict with ``backend_connected``, ``last_build``, ``nodes``, ``edges``.
"""
status: dict[str, Any] = {
"tenant_id": tenant_id,
"repo_id": repo_id,
+ "graph_backend": cfg.graph_backend,
"neo4j_connected": False,
+ "backend_connected": False,
"last_build": None,
"nodes": None,
"edges": None,
@@ -429,26 +408,19 @@ async def tool_graph_status(
except Exception:
pass
- # Check Neo4j connectivity
- driver = _make_async_driver(cfg)
- if driver is not None:
+ # Check backend connectivity
+ backend = _make_backend(cfg)
+ if backend is not None:
try:
- async with driver.session() as session:
- result = await session.run(
- """
- MATCH (n:LexicalNode {tenant_id: $tenant_id, repo_id: $repo_id})
- RETURN count(n) AS node_count
- """,
- tenant_id=tenant_id,
- repo_id=repo_id,
- )
- record = await result.single()
- if record:
- status["nodes"] = {"total": record["node_count"]}
- status["neo4j_connected"] = True
+ node_count = await backend.count_nodes(tenant_id, repo_id)
+ if node_count is not None:
+ status["nodes"] = {"total": node_count}
+ status["backend_connected"] = True
+ if cfg.graph_backend == "neo4j":
+ status["neo4j_connected"] = True
except Exception as exc:
- status["neo4j_error"] = str(exc)
+ status["backend_error"] = str(exc)
finally:
- await driver.close()
+ await backend.close()
return str(yaml.safe_dump(status, default_flow_style=False, sort_keys=True))
diff --git a/templates/AGENTS.md b/templates/AGENTS.md
index ff85748..0d349e6 100644
--- a/templates/AGENTS.md
+++ b/templates/AGENTS.md
@@ -51,7 +51,8 @@ codebase_graph_query(query_type="referential", query="authenticate")
| `referential` | Find call/import/extends/auth-guard relationships |
| `semantic` | Read taint-flow findings (run `taint_analysis` first; returns empty until then) |
| `dependency` | List external package dependencies |
-| `cypher` | Write a custom Cypher query for anything not covered above |
+| `cypher` | Raw Cypher query (Neo4j backend) |
+| `gremlin` | Raw Gremlin query (JanusGraph backend) |
## Taint-flow analysis
diff --git a/templates/CLAUDE.md b/templates/CLAUDE.md
index 6d2df5a..a4ce7df 100644
--- a/templates/CLAUDE.md
+++ b/templates/CLAUDE.md
@@ -51,7 +51,8 @@ codebase_graph_query(query_type="referential", query="authenticate")
| `referential` | Find call/import/extends/auth-guard relationships |
| `semantic` | Read taint-flow findings (run `taint_analysis` first; returns empty until then) |
| `dependency` | List external package dependencies |
-| `cypher` | Write a custom Cypher query for anything not covered above |
+| `cypher` | Raw Cypher query (Neo4j backend) |
+| `gremlin` | Raw Gremlin query (JanusGraph backend) |
## Taint-flow analysis
diff --git a/templates/global-claude-code/CLAUDE.md b/templates/global-claude-code/CLAUDE.md
index c05477b..a923956 100644
--- a/templates/global-claude-code/CLAUDE.md
+++ b/templates/global-claude-code/CLAUDE.md
@@ -33,7 +33,7 @@ Read `last_build` in the response:
graph_rebuild(repo_path="", repo_id="")
```
-This parses every source file in the repository and writes the structural graph to Neo4j.
+This parses every source file in the repository and writes the structural graph to the configured backend (Neo4j or JanusGraph).
Report back: node count, edge count, languages detected.
## Step 4 — Answer structural questions via the graph
@@ -62,7 +62,8 @@ after identifying it via the graph.
| `referential` | Find call/import/extends/auth-guard relationships |
| `semantic` | Read taint-flow findings (run `taint_analysis` first; returns empty until then) |
| `dependency` | List external package dependencies |
-| `cypher` | Write a custom Cypher query for anything not covered above |
+| `cypher` | Raw Cypher query (Neo4j backend) |
+| `gremlin` | Raw Gremlin query (JanusGraph backend) |
## Taint-flow analysis (optional)
diff --git a/templates/global-codex/AGENTS.md b/templates/global-codex/AGENTS.md
index 619679b..f001e1e 100644
--- a/templates/global-codex/AGENTS.md
+++ b/templates/global-codex/AGENTS.md
@@ -33,7 +33,7 @@ Read `last_build` in the response:
graph_rebuild(repo_path="", repo_id="")
```
-This parses every source file in the repository and writes the structural graph to Neo4j.
+This parses every source file in the repository and writes the structural graph to the configured backend (Neo4j or JanusGraph).
Report back: node count, edge count, languages detected.
## Step 4 — Answer structural questions via the graph
@@ -62,7 +62,8 @@ after identifying it via the graph.
| `referential` | Find call/import/extends/auth-guard relationships |
| `semantic` | Read taint-flow findings (run `taint_analysis` first; returns empty until then) |
| `dependency` | List external package dependencies |
-| `cypher` | Write a custom Cypher query for anything not covered above |
+| `cypher` | Raw Cypher query (Neo4j backend) |
+| `gremlin` | Raw Gremlin query (JanusGraph backend) |
## Important: empty results do not mean no symbols
diff --git a/templates/global-codex/config-snippet.yaml b/templates/global-codex/config-snippet.yaml
index 0918cab..073d3e2 100644
--- a/templates/global-codex/config-snippet.yaml
+++ b/templates/global-codex/config-snippet.yaml
@@ -7,7 +7,12 @@
#
# MCP server is spawned as a subprocess via stdio — no Docker, no volume mounts.
# The codesteward-taint binary is auto-detected on PATH if installed.
+#
+# Two backend options:
+# Neo4j (default): set NEO4J_URI/NEO4J_USER/NEO4J_PASSWORD
+# JanusGraph: set GRAPH_BACKEND=janusgraph and JANUSGRAPH_URL
+# ── Neo4j backend (default) ────────────────────────────────────────────────
mcp_servers:
codesteward:
command: uvx
@@ -19,3 +24,17 @@ mcp_servers:
NEO4J_URI: "bolt://localhost:7687"
NEO4J_USER: "neo4j"
NEO4J_PASSWORD: "your-neo4j-password"
+
+# ── JanusGraph backend (alternative) ───────────────────────────────────────
+# Uncomment below (and comment out the Neo4j block above) to use JanusGraph:
+#
+# mcp_servers:
+# codesteward:
+# command: uvx
+# args:
+# - "codesteward-mcp[graph-all,janusgraph]"
+# - "--transport"
+# - "stdio"
+# env:
+# GRAPH_BACKEND: "janusgraph"
+# JANUSGRAPH_URL: "ws://localhost:8182/gremlin"
diff --git a/tests/conftest.py b/tests/conftest.py
index 7a9af6c..6ac00ed 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -31,3 +31,20 @@ def cfg_with_neo4j() -> McpConfig:
default_tenant_id="test-tenant",
default_repo_id="test-repo",
)
+
+
+@pytest.fixture
+def cfg_with_janusgraph() -> McpConfig:
+ """McpConfig with JanusGraph as graph backend.
+
+ Note: does not actually connect — tests using this fixture must mock
+ the JanusGraph backend.
+ """
+ return McpConfig(
+ graph_backend="janusgraph",
+ janusgraph_url="ws://localhost:8182/gremlin",
+ neo4j_password="",
+ workspace_base="/tmp/codesteward-test-workspace",
+ default_tenant_id="test-tenant",
+ default_repo_id="test-repo",
+ )
diff --git a/tests/test_mcp/test_graph_tools.py b/tests/test_mcp/test_graph_tools.py
index a5bb2a7..724d495 100644
--- a/tests/test_mcp/test_graph_tools.py
+++ b/tests/test_mcp/test_graph_tools.py
@@ -55,7 +55,7 @@ async def test_stub_mode_no_neo4j(self, cfg: McpConfig, tmp_path: Path) -> None:
data = yaml.safe_load(result)
assert data["mode"] == "full"
- assert data["neo4j_connected"] is False
+ assert data["backend_connected"] is False
assert "duration_ms" in data
async def test_incremental_mode(self, cfg: McpConfig, tmp_path: Path) -> None:
@@ -115,9 +115,35 @@ async def test_error_returns_yaml_error(self, cfg: McpConfig, tmp_path: Path) ->
# ---------------------------------------------------------------------------
+def _mock_neo4j_backend(records=None):
+ """Create a mock Neo4j backend for testing."""
+ backend = MagicMock()
+ backend.is_connected.return_value = True
+ backend.backend_name = "neo4j"
+ backend.raw_query_language = "cypher"
+ backend.close = AsyncMock()
+ backend.query_named = AsyncMock(return_value=records or [])
+ backend.query_raw = AsyncMock(return_value=records or [])
+ backend.count_nodes = AsyncMock(return_value=0)
+ return backend
+
+
+def _mock_janusgraph_backend(records=None):
+ """Create a mock JanusGraph backend for testing."""
+ backend = MagicMock()
+ backend.is_connected.return_value = True
+ backend.backend_name = "janusgraph"
+ backend.raw_query_language = "gremlin"
+ backend.close = AsyncMock()
+ backend.query_named = AsyncMock(return_value=records or [])
+ backend.query_raw = AsyncMock(return_value=records or [])
+ backend.count_nodes = AsyncMock(return_value=0)
+ return backend
+
+
class TestToolCodebaseGraphQuery:
async def test_stub_when_no_neo4j(self, cfg: McpConfig) -> None:
- """Returns stub response when Neo4j is not configured."""
+ """Returns stub response when no backend is configured."""
result = await tool_codebase_graph_query(
query_type="lexical",
query="",
@@ -133,13 +159,12 @@ async def test_stub_when_no_neo4j(self, cfg: McpConfig) -> None:
async def test_unknown_query_type_returns_error(self, cfg_with_neo4j: McpConfig) -> None:
"""Unknown query_type returns an error dict (no crash)."""
- mock_driver = MagicMock()
- mock_session = AsyncMock()
- mock_driver.session.return_value.__aenter__ = AsyncMock(return_value=mock_session)
- mock_driver.session.return_value.__aexit__ = AsyncMock(return_value=False)
- mock_driver.close = AsyncMock()
+ backend = _mock_neo4j_backend()
+ backend.query_named = AsyncMock(
+ side_effect=ValueError("unknown query_type 'invalid_type'")
+ )
- with patch("codesteward.mcp.tools.graph._make_async_driver", return_value=mock_driver):
+ with patch("codesteward.mcp.tools.graph._make_backend", return_value=backend):
result = await tool_codebase_graph_query(
query_type="invalid_type",
query="",
@@ -160,17 +185,9 @@ async def test_lexical_query_returns_results(self, cfg_with_neo4j: McpConfig) ->
"line_start": 10, "line_end": 20, "language": "python", "is_async": False}
]
- mock_result = AsyncMock()
- mock_result.data = AsyncMock(return_value=mock_records)
- mock_session = AsyncMock()
- mock_session.run = AsyncMock(return_value=mock_result)
+ backend = _mock_neo4j_backend(records=mock_records)
- mock_driver = MagicMock()
- mock_driver.session.return_value.__aenter__ = AsyncMock(return_value=mock_session)
- mock_driver.session.return_value.__aexit__ = AsyncMock(return_value=False)
- mock_driver.close = AsyncMock()
-
- with patch("codesteward.mcp.tools.graph._make_async_driver", return_value=mock_driver):
+ with patch("codesteward.mcp.tools.graph._make_backend", return_value=backend):
result = await tool_codebase_graph_query(
query_type="lexical",
query="my_func",
@@ -184,6 +201,145 @@ async def test_lexical_query_returns_results(self, cfg_with_neo4j: McpConfig) ->
assert data["total"] == 1
assert data["results"][0]["name"] == "my_func"
+ async def test_raw_cypher_on_neo4j(self, cfg_with_neo4j: McpConfig) -> None:
+ """Raw Cypher passthrough works on Neo4j backend."""
+ backend = _mock_neo4j_backend(records=[{"count": 42}])
+
+ with patch("codesteward.mcp.tools.graph._make_backend", return_value=backend):
+ result = await tool_codebase_graph_query(
+ query_type="cypher",
+ query="MATCH (n) RETURN count(n) AS count",
+ tenant_id="t1",
+ repo_id="r1",
+ limit=10,
+ cfg=cfg_with_neo4j,
+ )
+
+ data = yaml.safe_load(result)
+ assert data["total"] == 1
+ backend.query_raw.assert_called_once()
+
+ async def test_raw_gremlin_rejected_on_neo4j(self, cfg_with_neo4j: McpConfig) -> None:
+ """Gremlin query type is rejected when Neo4j backend is active."""
+ backend = _mock_neo4j_backend()
+
+ with patch("codesteward.mcp.tools.graph._make_backend", return_value=backend):
+ result = await tool_codebase_graph_query(
+ query_type="gremlin",
+ query="g.V().count()",
+ tenant_id="t1",
+ repo_id="r1",
+ limit=10,
+ cfg=cfg_with_neo4j,
+ )
+
+ data = yaml.safe_load(result)
+ assert "error" in data
+ assert "mismatch" in data["error"]
+
+
+# ---------------------------------------------------------------------------
+# codebase_graph_query — JanusGraph backend
+# ---------------------------------------------------------------------------
+
+
+class TestToolCodebaseGraphQueryJanusGraph:
+ async def test_lexical_query_on_janusgraph(
+ self, cfg_with_janusgraph: McpConfig
+ ) -> None:
+ """Lexical query works on JanusGraph backend."""
+ mock_records = [
+ {"type": "function", "name": "handler", "file": "routes.go",
+ "line_start": 5, "line_end": 15, "language": "go", "is_async": False}
+ ]
+ backend = _mock_janusgraph_backend(records=mock_records)
+
+ with patch("codesteward.mcp.tools.graph._make_backend", return_value=backend):
+ result = await tool_codebase_graph_query(
+ query_type="lexical",
+ query="handler",
+ tenant_id="t1",
+ repo_id="r1",
+ limit=50,
+ cfg=cfg_with_janusgraph,
+ )
+
+ data = yaml.safe_load(result)
+ assert data["total"] == 1
+ assert data["results"][0]["name"] == "handler"
+ backend.query_named.assert_called_once_with(
+ query_type="lexical",
+ tenant_id="t1",
+ repo_id="r1",
+ filter_str="handler",
+ limit=50,
+ )
+
+ async def test_raw_gremlin_on_janusgraph(
+ self, cfg_with_janusgraph: McpConfig
+ ) -> None:
+ """Raw Gremlin passthrough works on JanusGraph backend."""
+ backend = _mock_janusgraph_backend(records=[{"result": 42}])
+
+ with patch("codesteward.mcp.tools.graph._make_backend", return_value=backend):
+ result = await tool_codebase_graph_query(
+ query_type="gremlin",
+ query="g.V().count()",
+ tenant_id="t1",
+ repo_id="r1",
+ limit=10,
+ cfg=cfg_with_janusgraph,
+ )
+
+ data = yaml.safe_load(result)
+ assert data["total"] == 1
+ backend.query_raw.assert_called_once()
+
+ async def test_raw_cypher_rejected_on_janusgraph(
+ self, cfg_with_janusgraph: McpConfig
+ ) -> None:
+ """Cypher query type is rejected when JanusGraph backend is active."""
+ backend = _mock_janusgraph_backend()
+
+ with patch("codesteward.mcp.tools.graph._make_backend", return_value=backend):
+ result = await tool_codebase_graph_query(
+ query_type="cypher",
+ query="MATCH (n) RETURN n",
+ tenant_id="t1",
+ repo_id="r1",
+ limit=10,
+ cfg=cfg_with_janusgraph,
+ )
+
+ data = yaml.safe_load(result)
+ assert "error" in data
+ assert "mismatch" in data["error"]
+
+ async def test_referential_query_on_janusgraph(
+ self, cfg_with_janusgraph: McpConfig
+ ) -> None:
+ """Referential query works on JanusGraph backend."""
+ mock_records = [
+ {"from_name": "main", "from_file": "app.py", "edge_type": "CALLS",
+ "to_name": "helper", "to_file": "utils.py", "to_node_type": "function",
+ "line": 10}
+ ]
+ backend = _mock_janusgraph_backend(records=mock_records)
+
+ with patch("codesteward.mcp.tools.graph._make_backend", return_value=backend):
+ result = await tool_codebase_graph_query(
+ query_type="referential",
+ query="main",
+ tenant_id="t1",
+ repo_id="r1",
+ limit=50,
+ cfg=cfg_with_janusgraph,
+ )
+
+ data = yaml.safe_load(result)
+ assert data["total"] == 1
+ assert data["results"][0]["edge_type"] == "CALLS"
+
# ---------------------------------------------------------------------------
# graph_augment
@@ -192,7 +348,7 @@ async def test_lexical_query_returns_results(self, cfg_with_neo4j: McpConfig) ->
class TestToolGraphAugment:
async def test_stub_mode_writes_without_neo4j(self, cfg: McpConfig) -> None:
- """In stub mode (no Neo4j) valid edges are accepted and returned."""
+ """In stub mode (no backend) valid edges are accepted and returned."""
result = await tool_graph_augment(
tenant_id="t1",
repo_id="r1",
@@ -309,6 +465,35 @@ async def test_partial_status_on_mixed_input(self, cfg: McpConfig) -> None:
assert data["written"] == 1
assert data["skipped"] == 1
+ async def test_augment_with_janusgraph_backend(
+ self, cfg_with_janusgraph: McpConfig
+ ) -> None:
+ """graph_augment works with JanusGraph backend."""
+ backend = _mock_janusgraph_backend()
+ backend.write_augment_edge = AsyncMock()
+
+ with patch("codesteward.mcp.tools.graph._make_backend", return_value=backend):
+ result = await tool_graph_augment(
+ tenant_id="t1",
+ repo_id="r1",
+ agent_id="security-agent",
+ additions=[
+ {
+ "source_id": "fn:t1:r1:app.go:handler",
+ "edge_type": "calls",
+ "target_id": "fn:t1:r1:db.go:query",
+ "target_name": "query",
+ "confidence": 0.9,
+ }
+ ],
+ cfg=cfg_with_janusgraph,
+ )
+
+ data = yaml.safe_load(result)
+ assert data["written"] == 1
+ assert data["graph_backend"] == "janusgraph"
+ backend.write_augment_edge.assert_called_once()
+
# ---------------------------------------------------------------------------
# graph_status
@@ -317,7 +502,7 @@ async def test_partial_status_on_mixed_input(self, cfg: McpConfig) -> None:
class TestToolGraphStatus:
async def test_stub_mode_no_neo4j(self, cfg: McpConfig, tmp_path: Path) -> None:
- """Returns status with neo4j_connected=False when no driver."""
+ """Returns status with backend_connected=False when no backend."""
cfg = McpConfig(
neo4j_password="",
workspace_base=str(tmp_path),
@@ -328,7 +513,7 @@ async def test_stub_mode_no_neo4j(self, cfg: McpConfig, tmp_path: Path) -> None:
result = await tool_graph_status(tenant_id="t1", repo_id="r1", cfg=cfg)
data = yaml.safe_load(result)
- assert data["neo4j_connected"] is False
+ assert data["backend_connected"] is False
assert data["tenant_id"] == "t1"
assert data["repo_id"] == "r1"
@@ -358,3 +543,21 @@ async def test_reads_workspace_metadata(self, cfg: McpConfig, tmp_path: Path) ->
assert data["last_build"] == "2026-01-01T12:00:00"
assert data["nodes"]["total"] == 42
+
+ async def test_status_with_janusgraph_backend(
+ self, cfg_with_janusgraph: McpConfig, tmp_path: Path
+ ) -> None:
+ """graph_status works with JanusGraph backend."""
+ cfg_with_janusgraph.workspace_base = str(tmp_path)
+ backend = _mock_janusgraph_backend()
+ backend.count_nodes = AsyncMock(return_value=100)
+
+ with patch("codesteward.mcp.tools.graph._make_backend", return_value=backend):
+ result = await tool_graph_status(
+ tenant_id="t1", repo_id="r1", cfg=cfg_with_janusgraph
+ )
+
+ data = yaml.safe_load(result)
+ assert data["backend_connected"] is True
+ assert data["graph_backend"] == "janusgraph"
+ assert data["nodes"]["total"] == 100