22from contextlib import contextmanager , asynccontextmanager
33
44from rid_lib .ext import Bundle
5+ from rid_lib .types import KoiNetNode
56
67from .sync_manager import SyncManager
78from .handshaker import Handshaker
@@ -90,31 +91,29 @@ def start(self):
9091 self .event_worker .thread .start ()
9192 self .graph .generate ()
9293
93- # refresh to reflect changes (if any) in config.yaml
94-
94+ # refresh to reflect changes (if any) in config.yaml node profile
9595 self .kobj_queue .push (bundle = Bundle .generate (
9696 rid = self .identity .rid ,
9797 contents = self .identity .profile .model_dump ()
9898 ))
9999
100- log .debug ("Waiting for kobj queue to empty" )
101100 self .kobj_queue .q .join ()
102101
103- if self .sync_manager .catch_up_with_coordinators ():
104- pass
102+ node_providers = self .graph .get_neighbors (
103+ direction = "in" ,
104+ allowed_type = [KoiNetNode ]
105+ )
106+
107+ if node_providers :
108+ log .debug (f"Catching up with `orn:koi-net.node` providers: { node_providers } " )
109+ self .sync_manager .catch_up_with (node_providers , [KoiNetNode ])
105110
106111 elif self .config .koi_net .first_contact .rid :
107- log .debug (f"I don't have any edges with coordinators, reaching out to first contact { self .config .koi_net .first_contact .rid !r} " )
108-
112+ log .debug (f"No edges with `orn:koi-net.node` providers, reaching out to first contact { self .config .koi_net .first_contact .rid !r} " )
109113 self .handshaker .handshake_with (self .config .koi_net .first_contact .rid )
110-
111-
114+
112115 def stop (self ):
113- """Stops a node.
114-
115- Finishes processing knowledge object queue.
116- """
117- log .info (f"Waiting for kobj queue to empty ({ self .kobj_queue .q .unfinished_tasks } tasks remaining)" )
116+ """Stops a node, send stop signals to workers."""
118117
119118 self .kobj_queue .q .put (STOP_WORKER )
120119 self .event_queue .q .put (STOP_WORKER )
0 commit comments