Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ dependencies = [
"instructor",
"gitpython",
"pillow>=10.4.0",
"networkx",
"scikit-network",

# LangChain ecosystem
"langchain",
Expand Down Expand Up @@ -140,7 +142,7 @@ dev = [
# Core dev tools (shared with minimal)
"ruff",
"pyright>=1.1.403",
"pre-commit>=4.3.0",
"pre-commit>=4.3.0",
"pytest",
"pytest-xdist[psutil]",
"pytest-asyncio",
Expand Down
222 changes: 183 additions & 39 deletions src/ragas/testset/graph.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import json
import typing as t
import uuid
from collections import defaultdict
from copy import deepcopy
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path

from pydantic import BaseModel, Field, field_serializer
from tqdm.auto import tqdm


class UUIDEncoder(json.JSONEncoder):
Expand Down Expand Up @@ -250,67 +252,209 @@ def __repr__(self) -> str:
def __str__(self) -> str:
return self.__repr__()

def get_node_by_id(self, node_id: t.Union[uuid.UUID, str]) -> t.Optional[Node]:
"""
Retrieves a node by its ID.

Parameters
----------
node_id : uuid.UUID
The ID of the node to retrieve.

Returns
-------
Node or None
The node with the specified ID, or None if not found.
"""
if isinstance(node_id, str):
node_id = uuid.UUID(node_id)

return next(filter(lambda n: n.id == node_id, self.nodes), None)

def find_indirect_clusters(
self,
relationship_condition: t.Callable[[Relationship], bool] = lambda _: True,
depth_limit: int = 3,
) -> t.List[t.Set[Node]]:
"""
Finds indirect clusters of nodes in the knowledge graph based on a relationship condition.
Here if A -> B -> C -> D, then A, B, C, and D form a cluster. If there's also a path A -> B -> C -> E,
it will form a separate cluster.
Uses Leiden algorithm for community detection and identifies unique paths within each cluster.

Parameters
----------
relationship_condition : Callable[[Relationship], bool], optional
A function that takes a Relationship and returns a boolean, by default lambda _: True
depth_limit : int, optional
The maximum depth of relationships (number of edges) to consider for clustering, by default 3.

Returns
-------
List[Set[Node]]
A list of sets, where each set contains nodes that form a cluster.
"""
clusters = []
visited_paths = set()

relationships = [
rel for rel in self.relationships if relationship_condition(rel)
]

def dfs(node: Node, cluster: t.Set[Node], depth: int, path: t.Tuple[Node, ...]):
if depth >= depth_limit or path in visited_paths:
return
visited_paths.add(path)
cluster.add(node)
import networkx as nx

def get_node_clusters(
relationships: list[Relationship],
) -> dict[int, set[uuid.UUID]]:
"""Identify clusters of nodes using Leiden algorithm."""
from sknetwork.clustering import Leiden
from sknetwork.data import from_edge_list

# NOTE: the upstream sknetwork Dataset has some issues with type hints,
# so we use type: ignore to bypass them.
# graph: sknetwork.data.Dataset
graph = from_edge_list( # type: ignore
[(str(rel.source.id), str(rel.target.id)) for rel in relationships],
directed=True,
)

# Apply Leiden clustering
leiden = Leiden(random_state=42)
cluster_labels = leiden.fit_predict(graph.adjacency) # type: ignore

# Group nodes by cluster
clusters: defaultdict[int, set[uuid.UUID]] = defaultdict(set)
for label, node_id in zip(cluster_labels, graph.names): # type: ignore
clusters[int(label)].add(uuid.UUID(node_id))

return dict(clusters)

def to_nx_digraph(
nodes: set[uuid.UUID], relationships: list[Relationship]
) -> nx.DiGraph:
"""Convert a set of nodes and relationships to a directed graph."""
# Create directed subgraph for this cluster
graph = nx.DiGraph()
for node_id in nodes:
graph.add_node(
node_id,
node_obj=self.get_node_by_id(node_id),
)
for rel in relationships:
neighbor = None
if rel.source == node and rel.target not in cluster:
neighbor = rel.target
elif (
rel.bidirectional
and rel.target == node
and rel.source not in cluster
):
neighbor = rel.source

if neighbor is not None:
dfs(neighbor, cluster.copy(), depth + 1, path + (neighbor,))

# Add completed path-based cluster
if len(cluster) > 1:
clusters.append(cluster)

for node in self.nodes:
initial_cluster = set()
dfs(node, initial_cluster, 0, (node,))

# Remove duplicates by converting clusters to frozensets
unique_clusters = [
set(cluster) for cluster in set(frozenset(c) for c in clusters)
]
if rel.source.id in nodes and rel.target.id in nodes:
graph.add_edge(rel.source.id, rel.target.id, relationship_obj=rel)
return graph

def max_simple_paths(n: int, k: int = depth_limit) -> int:
"""Estimate the number of paths up to depth_limit that would exist in a fully-connected graph of size cluster_nodes."""
from math import prod

if n - k - 1 <= 0:
return 0

return prod(n - i for i in range(k + 1))

def exhaustive_paths(
graph: nx.DiGraph, depth_limit: int
) -> list[list[uuid.UUID]]:
"""Find all simple paths in the subgraph up to depth_limit."""
import itertools

# Check if graph has enough nodes for meaningful paths
if len(graph) < 2:
return []

all_paths: list[list[uuid.UUID]] = []
for source, target in itertools.permutations(graph.nodes(), 2):
if not nx.has_path(graph, source, target):
continue
try:
paths = nx.all_simple_paths(
graph,
source,
target,
cutoff=depth_limit,
)
all_paths.extend(paths)
except nx.NetworkXNoPath:
continue

return all_paths

def sample_paths_from_graph(
graph: nx.DiGraph, depth_limit: int, sample_size: int = 1000
) -> list[list[uuid.UUID]]:
"""Sample random paths in the graph up to depth_limit."""
# we're using a DiGraph, so we need to account for directionality
# if a node has no out-paths, then it will cause an error in `generate_random_paths`

# Iteratively remove nodes with no out-paths to handle cascading effects
while True:
nodes_with_no_outpaths = [
n for n in graph.nodes() if graph.out_degree(n) == 0
]
if not nodes_with_no_outpaths:
break
graph.remove_nodes_from(nodes_with_no_outpaths)

# Check if graph is empty after node removal
if len(graph) == 0:
return []

sampled_paths: list[list[uuid.UUID]] = []
for depth in range(2, depth_limit + 1):
# Additional safety check before generating paths
if (
len(graph) < depth + 1
): # Need at least depth+1 nodes for a path of length depth
continue

paths = nx.generate_random_paths(
graph,
sample_size=sample_size,
path_length=depth,
)
sampled_paths.extend(paths)
return sampled_paths

# depth 2: 3 nodes, 2 edges (A -> B -> C)
if depth_limit < 2:
raise ValueError("Depth limit must be at least 2")

# Filter relationships based on the condition
filtered_relationships: list[Relationship] = []
relationship_map: defaultdict[uuid.UUID, set[uuid.UUID]] = defaultdict(set)
for rel in self.relationships:
if relationship_condition(rel):
filtered_relationships.append(rel)
relationship_map[rel.source.id].add(rel.target.id)
if rel.bidirectional:
relationship_map[rel.target.id].add(rel.source.id)

if not filtered_relationships:
return []

clusters = get_node_clusters(filtered_relationships)

# For each cluster, find valid paths up to depth_limit
cluster_sets: set[frozenset] = set()
for _cluster_label, cluster_nodes in tqdm(
clusters.items(), desc="Processing clusters"
):
if len(cluster_nodes) < depth_limit:
continue

subgraph = to_nx_digraph(
nodes=cluster_nodes, relationships=filtered_relationships
)

return unique_clusters
sampled_paths: list[list[uuid.UUID]] = []
# if the expected number of paths is small, use exhaustive search
# otherwise sample with random walks
if max_simple_paths(n=len(cluster_nodes), k=depth_limit) < 1000:
sampled_paths.extend(exhaustive_paths(subgraph, depth_limit))
else:
sampled_paths.extend(sample_paths_from_graph(subgraph, depth_limit))

# convert paths (node IDs) to sets of Node objects
# and deduplicate
for path in sampled_paths:
path_nodes = {subgraph.nodes[node_id]["node_obj"] for node_id in path}
cluster_sets.add(frozenset(path_nodes))

return [set(path_nodes) for path_nodes in cluster_sets]

def remove_node(
self, node: Node, inplace: bool = True
Expand Down
Loading
Loading