-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgraph.py
More file actions
89 lines (72 loc) · 2.86 KB
/
graph.py
File metadata and controls
89 lines (72 loc) · 2.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
"""NetworkX graph construction from Dict, node/edge schema, serialization to JSON."""
from __future__ import annotations
import json
from typing import Any
import networkx as nx
# Dict key prefixes used by resolvers and orchestrator
NODE_PREFIX = "node_"
EDGES_BATCH_PREFIX = "edges_batch_"
SEEN_PREFIX = "seen_"
def build_from_dict(d: dict[str, Any]) -> dict[str, Any]:
"""
Build graph payload from ephemeral Dict contents.
Dict keys: node_<entity_key> -> node dict, edges_batch_<uuid> -> list of edge dicts.
Returns JSON-serializable { "nodes": [...], "edges": [...] }.
"""
nodes: list[dict[str, Any]] = []
edges: list[dict[str, Any]] = []
seen_node_ids: set[str] = set()
for key, val in d.items():
if key.startswith(NODE_PREFIX) and isinstance(val, dict):
node_id = key[len(NODE_PREFIX) :]
node = _normalize_node(node_id, val)
if node["id"] not in seen_node_ids:
seen_node_ids.add(node["id"])
nodes.append(node)
elif key.startswith(EDGES_BATCH_PREFIX) and isinstance(val, list):
for e in val:
if isinstance(e, dict) and "source" in e and "target" in e:
edges.append(_normalize_edge(e))
edges = [e for e in edges if e["source"] in seen_node_ids
and e["target"] in seen_node_ids
and e["source"] != e["target"]]
return {"nodes": nodes, "edges": edges}
def _normalize_node(node_id: str, raw: dict[str, Any]) -> dict[str, Any]:
"""Ensure node has id, type, value, metadata, depth."""
return {
"id": raw.get("id", node_id),
"type": raw.get("type", "username"),
"value": raw.get("value", ""),
"metadata": raw.get("metadata", {}),
"depth": raw.get("depth", 0),
}
def _normalize_edge(raw: dict[str, Any]) -> dict[str, Any]:
"""Ensure edge has source, target, relationship, confidence."""
return {
"source": raw["source"],
"target": raw["target"],
"relationship": raw.get("relationship", "linked_to"),
"confidence": raw.get("confidence", 1.0),
}
def to_networkx(nodes: list[dict], edges: list[dict]) -> nx.DiGraph:
"""Build a NetworkX DiGraph from nodes/edges lists (for optional analysis)."""
G = nx.DiGraph()
for n in nodes:
G.add_node(
n["id"],
type=n.get("type"),
value=n.get("value"),
metadata=n.get("metadata", {}),
depth=n.get("depth", 0),
)
for e in edges:
G.add_edge(
e["source"],
e["target"],
relationship=e.get("relationship", "linked_to"),
confidence=e.get("confidence", 1.0),
)
return G
def serialize_graph(graph_payload: dict[str, Any]) -> str:
"""Serialize graph payload to JSON string."""
return json.dumps(graph_payload, default=str)