1- import logging
2- from rich .logging import RichHandler
3- from pydantic import Field
41from rid_lib .types import KoiNetNode , KoiNetEdge
5- from koi_net .config import NodeConfig , KoiNetConfig
6- from koi_net .protocol .node import NodeProfile , NodeProvides , NodeType
7- from koi_net import NodeInterface
8- from koi_net .context import HandlerContext
9- from koi_net .processor .handler import HandlerType
2+ import structlog
3+ from koi_net .config .full_node import (
4+ FullNodeConfig ,
5+ ServerConfig ,
6+ KoiNetConfig ,
7+ NodeProfile ,
8+ NodeProvides
9+ )
10+ from koi_net .core import FullNode
11+ from koi_net .processor .context import HandlerContext
12+ from koi_net .processor .handler import HandlerType , KnowledgeHandler
1013from koi_net .processor .knowledge_object import KnowledgeObject
1114from koi_net .protocol .event import Event , EventType
1215from koi_net .protocol .edge import EdgeType , generate_edge_bundle
1316
14- logging .basicConfig (
15- level = logging .INFO ,
16- format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s" ,
17- datefmt = "%Y-%m-%d %H:%M:%S" ,
18- handlers = [RichHandler ()]
19- )
17+ log = structlog .stdlib .get_logger ()
2018
21- logging .getLogger ("koi_net" ).setLevel (logging .DEBUG )
22- logger = logging .getLogger (__name__ )
2319
24- class CoordinatorConfig (NodeConfig ):
25- koi_net : KoiNetConfig = Field (default_factory = lambda :
26- KoiNetConfig (
27- node_name = "coordinator" ,
28- node_profile = NodeProfile (
29- node_type = NodeType .FULL ,
30- provides = NodeProvides (
31- event = [KoiNetNode , KoiNetEdge ],
32- state = [KoiNetNode , KoiNetEdge ]
33- )
34- ),
35- cache_directory_path = ".coordinator_rid_cache" ,
36- event_queues_path = "coordinator_event_queues.json" ,
37- private_key_pem_path = "coordinator_priv_key.pem"
38- )
20+ class CoordinatorConfig (FullNodeConfig ):
21+ server : ServerConfig = ServerConfig (port = 8080 )
22+ koi_net : KoiNetConfig = KoiNetConfig (
23+ node_name = "coordinator" ,
24+ node_profile = NodeProfile (
25+ provides = NodeProvides (
26+ event = [KoiNetNode , KoiNetEdge ],
27+ state = [KoiNetNode , KoiNetEdge ]
28+ )
29+ ),
30+ rid_types_of_interest = [KoiNetNode , KoiNetEdge ]
3931 )
40-
41- node = NodeInterface (
42- config = CoordinatorConfig .load_from_yaml ("coordinator_config.yaml" ),
43- use_kobj_processor_thread = True
44- )
4532
46- @node .processor .pipeline .register_handler (HandlerType .Network , rid_types = [KoiNetNode ])
33+ @KnowledgeHandler .create (
34+ HandlerType .Network ,
35+ rid_types = [KoiNetNode ])
4736def handshake_handler (ctx : HandlerContext , kobj : KnowledgeObject ):
48- logger .info ("Handling node handshake" )
37+ log .info ("Handling node handshake" )
4938
5039 # only respond if node declares itself as NEW
5140 if kobj .event_type != EventType .NEW :
5241 return
5342
54- logger .info ("Sharing this node's bundle with peer" )
55- identity_bundle = ctx .effector . deref (ctx .identity .rid )
56- ctx .event_queue .push_event_to (
43+ log .info ("Sharing this node's bundle with peer" )
44+ identity_bundle = ctx .cache . read (ctx .identity .rid )
45+ ctx .event_queue .push (
5746 event = Event .from_bundle (EventType .NEW , identity_bundle ),
58- node = kobj .rid ,
59- flush = True
47+ target = kobj .rid
6048 )
6149
62- logger .info ("Proposing new edge" )
50+ log .info ("Proposing new edge" )
6351 # defer handling of proposed edge
6452
6553 edge_bundle = generate_edge_bundle (
@@ -69,8 +57,12 @@ def handshake_handler(ctx: HandlerContext, kobj: KnowledgeObject):
6957 rid_types = [KoiNetNode , KoiNetEdge ]
7058 )
7159
72- ctx .handle (rid = edge_bundle .rid , event_type = EventType .FORGET )
73- ctx .handle (bundle = edge_bundle )
74-
60+ ctx .kobj_queue .push (rid = edge_bundle .rid , event_type = EventType .FORGET )
61+ ctx .kobj_queue .push (bundle = edge_bundle )
62+
63+ class CoordinatorNode (FullNode ):
64+ config_schema = CoordinatorConfig
65+ knowledge_handlers = FullNode .knowledge_handlers + [handshake_handler ]
66+
7567if __name__ == "__main__" :
76- node . server .run ()
68+ CoordinatorNode () .run ()
0 commit comments