Skip to content

Commit ad984aa

Browse files
authored
Merge pull request #13 from BlockScience/refactor
v1.1.0-beta.2
2 parents 7a4e03f + 1af1acf commit ad984aa

File tree

9 files changed

+371
-183
lines changed

9 files changed

+371
-183
lines changed

examples/basic_coordinator_node.py

Lines changed: 0 additions & 111 deletions
This file was deleted.

examples/coordinator.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
import logging
2+
from rich.logging import RichHandler
3+
from pydantic import Field
4+
from rid_lib.types import KoiNetNode, KoiNetEdge
5+
from koi_net.config import NodeConfig, KoiNetConfig
6+
from koi_net.protocol.node import NodeProfile, NodeProvides, NodeType
7+
from koi_net import NodeInterface
8+
from koi_net.context import HandlerContext
9+
from koi_net.processor.handler import HandlerType
10+
from koi_net.processor.knowledge_object import KnowledgeObject
11+
from koi_net.protocol.event import Event, EventType
12+
from koi_net.protocol.edge import EdgeType, generate_edge_bundle
13+
14+
logging.basicConfig(
15+
level=logging.INFO,
16+
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
17+
datefmt="%Y-%m-%d %H:%M:%S",
18+
handlers=[RichHandler()]
19+
)
20+
21+
logging.getLogger("koi_net").setLevel(logging.DEBUG)
22+
logger = logging.getLogger(__name__)
23+
24+
class CoordinatorConfig(NodeConfig):
25+
koi_net: KoiNetConfig = Field(default_factory = lambda:
26+
KoiNetConfig(
27+
node_name="coordinator",
28+
node_profile=NodeProfile(
29+
node_type=NodeType.FULL,
30+
provides=NodeProvides(
31+
event=[KoiNetNode, KoiNetEdge],
32+
state=[KoiNetNode, KoiNetEdge]
33+
)
34+
),
35+
cache_directory_path=".coordinator_rid_cache",
36+
event_queues_path="coordinator_event_queues.json",
37+
private_key_pem_path="coordinator_priv_key.pem"
38+
)
39+
)
40+
41+
node = NodeInterface(
42+
config=CoordinatorConfig.load_from_yaml("coordinator_config.yaml"),
43+
use_kobj_processor_thread=True
44+
)
45+
46+
@node.processor.pipeline.register_handler(HandlerType.Network, rid_types=[KoiNetNode])
47+
def handshake_handler(ctx: HandlerContext, kobj: KnowledgeObject):
48+
logger.info("Handling node handshake")
49+
50+
# only respond if node declares itself as NEW
51+
if kobj.event_type != EventType.NEW:
52+
return
53+
54+
logger.info("Sharing this node's bundle with peer")
55+
identity_bundle = ctx.effector.deref(ctx.identity.rid)
56+
ctx.event_queue.push_event_to(
57+
event=Event.from_bundle(EventType.NEW, identity_bundle),
58+
node=kobj.rid,
59+
flush=True
60+
)
61+
62+
logger.info("Proposing new edge")
63+
# defer handling of proposed edge
64+
65+
edge_bundle = generate_edge_bundle(
66+
source=kobj.rid,
67+
target=ctx.identity.rid,
68+
edge_type=EdgeType.WEBHOOK,
69+
rid_types=[KoiNetNode, KoiNetEdge]
70+
)
71+
72+
ctx.handle(rid=edge_bundle.rid, event_type=EventType.FORGET)
73+
ctx.handle(bundle=edge_bundle)
74+
75+
if __name__ == "__main__":
76+
node.server.run()
Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
1-
import time
21
import logging
32
from pydantic import Field
43
from rich.logging import RichHandler
54
from koi_net import NodeInterface
65
from koi_net.protocol.node import NodeProfile, NodeType
7-
from koi_net.config import NodeConfig, KoiNetConfig, NodeContact
6+
from koi_net.config import NodeConfig, KoiNetConfig
87

98
logging.basicConfig(
109
level=logging.INFO,
@@ -24,25 +23,15 @@ class PartialNodeConfig(NodeConfig):
2423
node_profile=NodeProfile(
2524
node_type=NodeType.PARTIAL
2625
),
27-
cache_directory_path=".basic_partial_rid_cache",
28-
event_queues_path="basic_partial_event_queues.json"
26+
cache_directory_path=".partial_rid_cache",
27+
event_queues_path="partial_event_queues.json",
28+
private_key_pem_path="partial_priv_key.pem"
2929
)
3030
)
3131

32-
3332
node = NodeInterface(
34-
config=PartialNodeConfig.load_from_yaml("basic_partial_config.yaml")
33+
config=PartialNodeConfig.load_from_yaml("partial_config.yaml")
3534
)
3635

37-
38-
node.start()
39-
40-
while True:
41-
neighbors = node.resolver.poll_neighbors()
42-
for node_rid in neighbors:
43-
events = neighbors[node_rid]
44-
for event in events:
45-
node.processor.handle(event=event, source=node_rid)
46-
node.processor.flush_kobj_queue()
47-
48-
time.sleep(5)
36+
if __name__ == "__main__":
37+
node.poller.run()

pyproject.toml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
44

55
[project]
66
name = "koi-net"
7-
version = "1.1.0-beta.1"
7+
version = "1.1.0-beta.2"
88
description = "Implementation of KOI-net protocol in Python"
99
authors = [
1010
{name = "Luke Miller", email = "[email protected]"}
@@ -19,15 +19,15 @@ dependencies = [
1919
"pydantic>=2.10.6",
2020
"ruamel.yaml>=0.18.10",
2121
"python-dotenv>=1.1.0",
22-
"cryptography>=45.0.3"
22+
"cryptography>=45.0.3",
23+
"fastapi>=0.115.12",
24+
"uvicorn>=0.34.2"
2325
]
2426

2527
[project.optional-dependencies]
2628
dev = ["twine>=6.0", "build"]
2729
examples = [
2830
"rich",
29-
"fastapi",
30-
"uvicorn"
3131
]
3232

3333
[project.urls]

src/koi_net/config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ class KoiNetConfig(BaseModel):
2929
cache_directory_path: str = ".rid_cache"
3030
event_queues_path: str = "event_queues.json"
3131
private_key_pem_path: str = "priv_key.pem"
32+
polling_interval: int = 5
3233

3334
first_contact: NodeContact = Field(default_factory=NodeContact)
3435

src/koi_net/core.py

Lines changed: 34 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import logging
2-
import httpx
2+
from koi_net.protocol.node import NodeType
33
from rid_lib.ext import Cache
44
from .network.resolver import NetworkResolver
55
from .network.event_queue import NetworkEventQueue
@@ -17,6 +17,9 @@
1717
from .config import NodeConfig
1818
from .context import HandlerContext, ActionContext
1919
from .effector import Effector
20+
from .server import NodeServer
21+
from .lifecycle import NodeLifecycle
22+
from .poller import NodePoller
2023
from . import default_actions
2124

2225
logger = logging.getLogger(__name__)
@@ -32,26 +35,24 @@ class NodeInterface:
3235
graph: NetworkGraph
3336
processor: ProcessorInterface
3437
secure: Secure
38+
server: NodeServer
3539

3640
use_kobj_processor_thread: bool
3741

3842
def __init__(
39-
self,
43+
self,
4044
config: NodeConfig,
4145
use_kobj_processor_thread: bool = False,
42-
4346
handlers: list[KnowledgeHandler] | None = None,
44-
4547
cache: Cache | None = None,
4648
processor: ProcessorInterface | None = None
4749
):
4850
self.config = config
4951
self.cache = cache or Cache(
5052
directory_path=self.config.koi_net.cache_directory_path
5153
)
52-
54+
5355
self.identity = NodeIdentity(config=self.config)
54-
5556
self.effector = Effector(cache=self.cache)
5657

5758
self.graph = NetworkGraph(
@@ -146,47 +147,30 @@ def __init__(
146147
self.effector.set_processor(self.processor)
147148
self.effector.set_resolver(self.resolver)
148149
self.effector.set_action_context(self.action_context)
149-
150-
151-
def start(self) -> None:
152-
"""Starts a node, call this method first.
153-
154-
Starts the processor thread (if enabled). Loads event queues into memory. Generates network graph from nodes and edges in cache. Processes any state changes of node bundle. Initiates handshake with first contact (if provided) if node doesn't have any neighbors.
155-
"""
156-
if self.use_kobj_processor_thread:
157-
logger.info("Starting processor worker thread")
158-
self.processor.worker_thread.start()
159-
160-
# self.network._load_event_queues()
161-
self.graph.generate()
162-
163-
# refresh to reflect changes (if any) in config.yaml
164-
self.effector.deref(self.identity.rid, refresh_cache=True)
165-
166-
logger.debug("Waiting for kobj queue to empty")
167-
if self.use_kobj_processor_thread:
168-
self.processor.kobj_queue.join()
169-
else:
170-
self.processor.flush_kobj_queue()
171-
logger.debug("Done")
172-
173-
if not self.graph.get_neighbors() and self.config.koi_net.first_contact.rid:
174-
logger.debug(f"I don't have any neighbors, reaching out to first contact {self.config.koi_net.first_contact.rid!r}")
175-
176-
self.actor.handshake_with(self.config.koi_net.first_contact.rid)
177-
178-
179-
def stop(self):
180-
"""Stops a node, call this method last.
181-
182-
Finishes processing knowledge object queue. Saves event queues to storage.
183-
"""
184-
logger.info("Stopping node...")
185-
186-
if self.use_kobj_processor_thread:
187-
logger.info(f"Waiting for kobj queue to empty ({self.processor.kobj_queue.unfinished_tasks} tasks remaining)")
188-
self.processor.kobj_queue.join()
189-
else:
190-
self.processor.flush_kobj_queue()
191-
192-
# self.network._save_event_queues()
150+
151+
self.lifecycle = NodeLifecycle(
152+
config=self.config,
153+
identity=self.identity,
154+
graph=self.graph,
155+
processor=self.processor,
156+
effector=self.effector,
157+
actor=self.actor,
158+
use_kobj_processor_thread=use_kobj_processor_thread
159+
)
160+
161+
# if self.config.koi_net.node_profile.node_type == NodeType.FULL:
162+
self.server = NodeServer(
163+
config=self.config,
164+
lifecycle=self.lifecycle,
165+
secure=self.secure,
166+
processor=self.processor,
167+
event_queue=self.event_queue,
168+
response_handler=self.response_handler
169+
)
170+
171+
self.poller = NodePoller(
172+
processor=self.processor,
173+
lifecycle=self.lifecycle,
174+
resolver=self.resolver,
175+
config=self.config
176+
)

0 commit comments

Comments
 (0)