Skip to content

Commit ef7010f

Browse files
authored
Merge pull request #14 from BlockScience/refactor
v1.1.0-beta.3
2 parents ad984aa + b856105 commit ef7010f

File tree

6 files changed

+72
-51
lines changed

6 files changed

+72
-51
lines changed

examples/partial.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818

1919
class PartialNodeConfig(NodeConfig):
20-
koi_net: KoiNetConfig | None = Field(default_factory = lambda:
20+
koi_net: KoiNetConfig = Field(default_factory = lambda:
2121
KoiNetConfig(
2222
node_name="partial",
2323
node_profile=NodeProfile(
@@ -34,4 +34,4 @@ class PartialNodeConfig(NodeConfig):
3434
)
3535

3636
if __name__ == "__main__":
37-
node.poller.run()
37+
node.lifecycle.start()

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
44

55
[project]
66
name = "koi-net"
7-
version = "1.1.0-beta.2"
7+
version = "1.1.0-beta.3"
88
description = "Implementation of KOI-net protocol in Python"
99
authors = [
1010
{name = "Luke Miller", email = "[email protected]"}

src/koi_net/core.py

Lines changed: 53 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import logging
2-
from koi_net.protocol.node import NodeType
2+
from typing import Generic, TypeVar
33
from rid_lib.ext import Cache
44
from .network.resolver import NetworkResolver
55
from .network.event_queue import NetworkEventQueue
@@ -25,9 +25,10 @@
2525
logger = logging.getLogger(__name__)
2626

2727

28+
T = TypeVar("T", bound=NodeConfig)
2829

29-
class NodeInterface:
30-
config: NodeConfig
30+
class NodeInterface(Generic[T]):
31+
config: T
3132
cache: Cache
3233
identity: NodeIdentity
3334
resolver: NetworkResolver
@@ -41,58 +42,75 @@ class NodeInterface:
4142

4243
def __init__(
4344
self,
44-
config: NodeConfig,
45+
config: T,
4546
use_kobj_processor_thread: bool = False,
4647
handlers: list[KnowledgeHandler] | None = None,
47-
cache: Cache | None = None,
48-
processor: ProcessorInterface | None = None
48+
49+
CacheOverride: type[Cache] | None = None,
50+
NodeIdentityOverride: type[NodeIdentity] | None = None,
51+
EffectorOverride: type[Effector] | None = None,
52+
NetworkGraphOverride: type[NetworkGraph] | None = None,
53+
SecureOverride: type[Secure] | None = None,
54+
RequestHandlerOverride: type[RequestHandler] | None = None,
55+
ResponseHandlerOverride: type[ResponseHandler] | None = None,
56+
NetworkResolverOverride: type[NetworkResolver] | None = None,
57+
NetworkEventQueueOverride: type[NetworkEventQueue] | None = None,
58+
ActorOverride: type[Actor] | None = None,
59+
ActionContextOverride: type[ActionContext] | None = None,
60+
HandlerContextOverride: type[HandlerContext] | None = None,
61+
KnowledgePipelineOverride: type[KnowledgePipeline] | None = None,
62+
ProcessorInterfaceOverride: type[ProcessorInterface] | None = None,
63+
ErrorHandlerOverride: type[ErrorHandler] | None = None,
64+
NodeLifecycleOverride: type[NodeLifecycle] | None = None,
65+
NodeServerOverride: type[NodeServer] | None = None,
66+
NodePollerOverride: type[NodePoller] | None = None,
4967
):
5068
self.config = config
51-
self.cache = cache or Cache(
69+
self.cache = (CacheOverride or Cache)(
5270
directory_path=self.config.koi_net.cache_directory_path
5371
)
5472

55-
self.identity = NodeIdentity(config=self.config)
56-
self.effector = Effector(cache=self.cache)
73+
self.identity = (NodeIdentityOverride or NodeIdentity)(config=self.config)
74+
self.effector = (EffectorOverride or Effector)(cache=self.cache)
5775

58-
self.graph = NetworkGraph(
59-
cache=self.cache,
76+
self.graph = (NetworkGraphOverride or NetworkGraph)(
77+
cache=self.cache,
6078
identity=self.identity
6179
)
62-
63-
self.secure = Secure(
64-
identity=self.identity,
65-
effector=self.effector,
80+
81+
self.secure = (SecureOverride or Secure)(
82+
identity=self.identity,
83+
effector=self.effector,
6684
config=self.config
6785
)
68-
69-
self.request_handler = RequestHandler(
70-
effector=self.effector,
86+
87+
self.request_handler = (RequestHandlerOverride or RequestHandler)(
88+
effector=self.effector,
7189
identity=self.identity,
7290
secure=self.secure
7391
)
74-
75-
self.response_handler = ResponseHandler(self.cache, self.effector)
76-
77-
self.resolver = NetworkResolver(
92+
93+
self.response_handler = (ResponseHandlerOverride or ResponseHandler)(self.cache, self.effector)
94+
95+
self.resolver = (NetworkResolverOverride or NetworkResolver)(
7896
config=self.config,
79-
cache=self.cache,
97+
cache=self.cache,
8098
identity=self.identity,
8199
graph=self.graph,
82100
request_handler=self.request_handler,
83101
effector=self.effector
84102
)
85-
86-
self.event_queue = NetworkEventQueue(
103+
104+
self.event_queue = (NetworkEventQueueOverride or NetworkEventQueue)(
87105
config=self.config,
88-
cache=self.cache,
106+
cache=self.cache,
89107
identity=self.identity,
90108
graph=self.graph,
91109
request_handler=self.request_handler,
92110
effector=self.effector
93111
)
94112

95-
self.actor = Actor(
113+
self.actor = (ActorOverride or Actor)(
96114
identity=self.identity,
97115
effector=self.effector,
98116
event_queue=self.event_queue
@@ -107,12 +125,12 @@ def __init__(
107125

108126
self.use_kobj_processor_thread = use_kobj_processor_thread
109127

110-
self.action_context = ActionContext(
128+
self.action_context = (ActionContextOverride or ActionContext)(
111129
identity=self.identity,
112130
effector=self.effector
113131
)
114132

115-
self.handler_context = HandlerContext(
133+
self.handler_context = (HandlerContextOverride or HandlerContext)(
116134
identity=self.identity,
117135
cache=self.cache,
118136
event_queue=self.event_queue,
@@ -121,7 +139,7 @@ def __init__(
121139
effector=self.effector
122140
)
123141

124-
self.pipeline = KnowledgePipeline(
142+
self.pipeline = (KnowledgePipelineOverride or KnowledgePipeline)(
125143
handler_context=self.handler_context,
126144
cache=self.cache,
127145
request_handler=self.request_handler,
@@ -130,12 +148,12 @@ def __init__(
130148
default_handlers=handlers
131149
)
132150

133-
self.processor = processor or ProcessorInterface(
151+
self.processor = (ProcessorInterfaceOverride or ProcessorInterface)(
134152
pipeline=self.pipeline,
135153
use_kobj_processor_thread=self.use_kobj_processor_thread
136154
)
137155

138-
self.error_handler = ErrorHandler(
156+
self.error_handler = (ErrorHandlerOverride or ErrorHandler)(
139157
processor=self.processor,
140158
actor=self.actor
141159
)
@@ -148,7 +166,7 @@ def __init__(
148166
self.effector.set_resolver(self.resolver)
149167
self.effector.set_action_context(self.action_context)
150168

151-
self.lifecycle = NodeLifecycle(
169+
self.lifecycle = (NodeLifecycleOverride or NodeLifecycle)(
152170
config=self.config,
153171
identity=self.identity,
154172
graph=self.graph,
@@ -159,7 +177,7 @@ def __init__(
159177
)
160178

161179
# if self.config.koi_net.node_profile.node_type == NodeType.FULL:
162-
self.server = NodeServer(
180+
self.server = (NodeServerOverride or NodeServer)(
163181
config=self.config,
164182
lifecycle=self.lifecycle,
165183
secure=self.secure,
@@ -168,7 +186,7 @@ def __init__(
168186
response_handler=self.response_handler
169187
)
170188

171-
self.poller = NodePoller(
189+
self.poller = (NodePollerOverride or NodePoller)(
172190
processor=self.processor,
173191
lifecycle=self.lifecycle,
174192
resolver=self.resolver,

src/koi_net/network/response_handler.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import logging
22
from rid_lib import RID
3+
from rid_lib.types import KoiNetNode
34
from rid_lib.ext import Manifest, Cache
45
from rid_lib.ext.bundle import Bundle
56

@@ -30,13 +31,13 @@ def __init__(
3031
self.cache = cache
3132
self.effector = effector
3233

33-
def fetch_rids(self, req: FetchRids) -> RidsPayload:
34+
def fetch_rids(self, req: FetchRids, source: KoiNetNode) -> RidsPayload:
3435
logger.info(f"Request to fetch rids, allowed types {req.rid_types}")
3536
rids = self.cache.list_rids(req.rid_types)
3637

3738
return RidsPayload(rids=rids)
3839

39-
def fetch_manifests(self, req: FetchManifests) -> ManifestsPayload:
40+
def fetch_manifests(self, req: FetchManifests, source: KoiNetNode) -> ManifestsPayload:
4041
logger.info(f"Request to fetch manifests, allowed types {req.rid_types}, rids {req.rids}")
4142

4243
manifests: list[Manifest] = []
@@ -51,7 +52,7 @@ def fetch_manifests(self, req: FetchManifests) -> ManifestsPayload:
5152

5253
return ManifestsPayload(manifests=manifests, not_found=not_found)
5354

54-
def fetch_bundles(self, req: FetchBundles) -> BundlesPayload:
55+
def fetch_bundles(self, req: FetchBundles, source: KoiNetNode) -> BundlesPayload:
5556
logger.info(f"Request to fetch bundles, requested rids {req.rids}")
5657

5758
bundles: list[Bundle] = []

src/koi_net/poller.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,19 @@ def __init__(
2222
self.resolver = resolver
2323
self.config = config
2424

25+
def poll(self):
26+
neighbors = self.resolver.poll_neighbors()
27+
for node_rid in neighbors:
28+
for event in neighbors[node_rid]:
29+
self.processor.handle(event=event, source=node_rid)
30+
self.processor.flush_kobj_queue()
31+
2532
def run(self):
2633
try:
2734
self.lifecycle.start()
2835
while True:
2936
start_time = time.time()
30-
neighbors = self.resolver.poll_neighbors()
31-
for node_rid in neighbors:
32-
for event in neighbors[node_rid]:
33-
self.processor.handle(event=event, source=node_rid)
34-
self.processor.flush_kobj_queue()
35-
37+
self.poll()
3638
elapsed = time.time() - start_time
3739
sleep_time = self.config.koi_net.polling_interval - elapsed
3840
if sleep_time > 0:

src/koi_net/server.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -109,20 +109,20 @@ async def poll_events(
109109
self, req: SignedEnvelope[PollEvents]
110110
) -> SignedEnvelope[EventsPayload] | ErrorResponse:
111111
logger.info(f"Request to {POLL_EVENTS_PATH}")
112-
events = self.event_queue.flush_poll_queue(req.payload.rid)
112+
events = self.event_queue.flush_poll_queue(req.source_node)
113113
return EventsPayload(events=events)
114114

115115
async def fetch_rids(
116116
self, req: SignedEnvelope[FetchRids]
117117
) -> SignedEnvelope[RidsPayload] | ErrorResponse:
118-
return self.response_handler.fetch_rids(req.payload)
118+
return self.response_handler.fetch_rids(req.payload, req.source_node)
119119

120120
async def fetch_manifests(
121121
self, req: SignedEnvelope[FetchManifests]
122122
) -> SignedEnvelope[ManifestsPayload] | ErrorResponse:
123-
return self.response_handler.fetch_manifests(req.payload)
123+
return self.response_handler.fetch_manifests(req.payload, req.source_node)
124124

125125
async def fetch_bundles(
126126
self, req: SignedEnvelope[FetchBundles]
127127
) -> SignedEnvelope[BundlesPayload] | ErrorResponse:
128-
return self.response_handler.fetch_bundles(req.payload)
128+
return self.response_handler.fetch_bundles(req.payload, req.source_node)

0 commit comments

Comments
 (0)