Skip to content

Commit b00b073

Browse files
committed
moved actor to a different filie name, fixed issue with duplicate state providers returning, added behavior to lifecycle start to catch up with coordinators after being offline
1 parent 8689328 commit b00b073

File tree

8 files changed

+86
-64
lines changed

8 files changed

+86
-64
lines changed

src/koi_net/actor.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
from logging import getLogger
2+
from rid_lib.types import KoiNetNode
3+
from rid_lib import RIDType
4+
from koi_net.context import HandlerContext
5+
from koi_net.protocol.api_models import ErrorResponse
6+
from .protocol.event import Event, EventType
7+
8+
9+
logger = getLogger(__name__)
10+
11+
12+
class Actor:
13+
ctx: HandlerContext
14+
15+
def __init__(self):
16+
pass
17+
18+
def set_ctx(self, ctx: HandlerContext):
19+
self.ctx = ctx
20+
21+
def handshake_with(self, target: KoiNetNode):
22+
logger.debug(f"Initiating handshake with {target}")
23+
self.ctx.event_queue.push_event_to(
24+
Event.from_rid(
25+
event_type=EventType.FORGET,
26+
rid=self.ctx.identity.rid),
27+
node=target
28+
)
29+
30+
self.ctx.event_queue.push_event_to(
31+
event=Event.from_bundle(
32+
event_type=EventType.NEW,
33+
bundle=self.ctx.effector.deref(self.ctx.identity.rid)),
34+
node=target
35+
)
36+
37+
self.ctx.event_queue.flush_webhook_queue(target)
38+
39+
def identify_coordinators(self):
40+
return self.ctx.resolver.get_state_providers(KoiNetNode)
41+
42+
def catch_up_with(self, target: KoiNetNode, rid_types: list[RIDType] = []):
43+
logger.debug(f"catching up with {target} on {rid_types or 'all types'}")
44+
45+
payload = self.ctx.request_handler.fetch_manifests(
46+
node=target,
47+
rid_types=rid_types
48+
)
49+
if type(payload) == ErrorResponse:
50+
logger.debug("failed to reach node")
51+
return
52+
53+
for manifest in payload.manifests:
54+
if manifest.rid == self.ctx.identity.rid:
55+
continue
56+
57+
self.ctx.handle(
58+
manifest=manifest,
59+
source=target
60+
)

src/koi_net/context.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
from rid_lib.ext import Cache
2+
3+
from koi_net.network.resolver import NetworkResolver
24
from .config import NodeConfig
35
from .effector import Effector
46
from .network.graph import NetworkGraph
@@ -28,6 +30,7 @@ class HandlerContext:
2830
event_queue: NetworkEventQueue
2931
graph: NetworkGraph
3032
request_handler: RequestHandler
33+
resolver: NetworkResolver
3134
effector: Effector
3235
_processor: ProcessorInterface | None
3336

@@ -39,6 +42,7 @@ def __init__(
3942
event_queue: NetworkEventQueue,
4043
graph: NetworkGraph,
4144
request_handler: RequestHandler,
45+
resolver: NetworkResolver,
4246
effector: Effector
4347
):
4448
self.identity = identity
@@ -47,6 +51,7 @@ def __init__(
4751
self.event_queue = event_queue
4852
self.graph = graph
4953
self.request_handler = request_handler
54+
self.resolver = resolver
5055
self.effector = effector
5156
self._processor = None
5257

src/koi_net/core.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from .network.request_handler import RequestHandler
88
from .network.response_handler import ResponseHandler
99
from .network.error_handler import ErrorHandler
10-
from .network.behavior import Actor
10+
from .actor import Actor
1111
from .processor.interface import ProcessorInterface
1212
from .processor import default_handlers
1313
from .processor.handler import KnowledgeHandler
@@ -110,11 +110,7 @@ def __init__(
110110
effector=self.effector
111111
)
112112

113-
self.actor = (ActorOverride or Actor)(
114-
identity=self.identity,
115-
effector=self.effector,
116-
event_queue=self.event_queue
117-
)
113+
self.actor = (ActorOverride or Actor)()
118114

119115
# pull all handlers defined in default_handlers module
120116
if handlers is None:
@@ -137,6 +133,7 @@ def __init__(
137133
event_queue=self.event_queue,
138134
graph=self.graph,
139135
request_handler=self.request_handler,
136+
resolver=self.resolver,
140137
effector=self.effector
141138
)
142139

@@ -167,14 +164,15 @@ def __init__(
167164
self.effector.set_resolver(self.resolver)
168165
self.effector.set_action_context(self.action_context)
169166

167+
self.actor.set_ctx(self.handler_context)
168+
170169
self.lifecycle = (NodeLifecycleOverride or NodeLifecycle)(
171170
config=self.config,
172171
identity=self.identity,
173172
graph=self.graph,
174173
processor=self.processor,
175174
effector=self.effector,
176175
actor=self.actor,
177-
handler_context=self.handler_context,
178176
use_kobj_processor_thread=use_kobj_processor_thread
179177
)
180178

src/koi_net/lifecycle.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
import logging
22
from contextlib import contextmanager, asynccontextmanager
33

4-
from koi_net.context import HandlerContext
4+
from rid_lib.types import KoiNetNode
55

6-
from .network.behavior import Actor
6+
from .actor import Actor
77
from .effector import Effector
88
from .config import NodeConfig
99
from .processor.interface import ProcessorInterface
@@ -28,7 +28,6 @@ def __init__(
2828
processor: ProcessorInterface,
2929
effector: Effector,
3030
actor: Actor,
31-
handler_context: HandlerContext,
3231
use_kobj_processor_thread: bool
3332
):
3433
self.config = config
@@ -37,7 +36,6 @@ def __init__(
3736
self.processor = processor
3837
self.effector = effector
3938
self.actor = actor
40-
self.handler_context = handler_context
4139
self.use_kobj_processor_thread = use_kobj_processor_thread
4240

4341
@contextmanager
@@ -89,8 +87,11 @@ def start(self):
8987
logger.debug(f"I don't have any neighbors, reaching out to first contact {self.config.koi_net.first_contact.rid!r}")
9088

9189
self.actor.handshake_with(self.config.koi_net.first_contact.rid)
92-
93-
90+
91+
for coordinator in self.actor.identify_coordinators():
92+
self.actor.catch_up_with(coordinator, rid_types=[KoiNetNode])
93+
94+
9495
def stop(self):
9596
"""Stops a node, call this method last.
9697

src/koi_net/network/behavior.py

Lines changed: 0 additions & 42 deletions
This file was deleted.

src/koi_net/network/error_handler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from koi_net.protocol.event import EventType
44
from rid_lib.types import KoiNetNode
55
from ..processor.interface import ProcessorInterface
6-
from ..network.behavior import Actor
6+
from ..actor import Actor
77

88
logger = getLogger(__name__)
99

src/koi_net/network/graph.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,15 +57,15 @@ def get_edges(
5757
"""Returns edges this node belongs to.
5858
5959
All edges returned by default, specify `direction` to restrict to incoming or outgoing edges only."""
60-
60+
6161
edges = []
6262
if direction != "in" and self.dg.out_edges:
6363
out_edges = self.dg.out_edges(self.identity.rid)
64-
edges.extend([e for e in out_edges])
64+
edges.extend(out_edges)
6565

6666
if direction != "out" and self.dg.in_edges:
6767
in_edges = self.dg.in_edges(self.identity.rid)
68-
edges.extend([e for e in in_edges])
68+
edges.extend(in_edges)
6969

7070
edge_rids = []
7171
for edge in edges:
@@ -87,7 +87,7 @@ def get_neighbors(
8787
8888
All neighboring nodes returned by default, specify `direction` to restrict to neighbors connected by incoming or outgoing edges only."""
8989

90-
neighbors = []
90+
neighbors = set()
9191
for edge_rid in self.get_edges(direction):
9292
edge_bundle = self.cache.read(edge_rid)
9393

@@ -96,17 +96,17 @@ def get_neighbors(
9696
continue
9797

9898
edge_profile = edge_bundle.validate_contents(EdgeProfile)
99-
99+
100100
if status and edge_profile.status != status:
101101
continue
102102

103103
if allowed_type and allowed_type not in edge_profile.rid_types:
104104
continue
105105

106106
if edge_profile.target == self.identity.rid:
107-
neighbors.append(edge_profile.source)
107+
neighbors.add(edge_profile.source)
108108
elif edge_profile.source == self.identity.rid:
109-
neighbors.append(edge_profile.target)
109+
neighbors.add(edge_profile.target)
110110

111111
return list(neighbors)
112112

src/koi_net/network/resolver.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ def __init__(
4949
def get_state_providers(self, rid_type: RIDType) -> list[KoiNetNode]:
5050
"""Returns list of node RIDs which provide state for the specified RID type."""
5151

52-
logger.debug(f"Looking for state providers of '{rid_type}'")
52+
logger.debug(f"Looking for state providers of {rid_type}")
5353
provider_nodes = []
5454
for node_rid in self.cache.list_rids(rid_types=[KoiNetNode]):
5555
if node_rid == self.identity.rid:

0 commit comments

Comments
 (0)