1- import structlog
1+ from dataclasses import dataclass
22from typing import Callable
33from enum import StrEnum
4+
5+ import structlog
46from rid_lib .ext import Cache , Bundle
57from rid_lib .core import RID , RIDType
68from rid_lib .types import KoiNetNode
9+
10+ from .processor .context import HandlerContext
711from .network .resolver import NetworkResolver
812from .processor .kobj_queue import KobjQueue
9- from .identity import NodeIdentity
1013
1114log = structlog .stdlib .get_logger ()
1215
13-
14- class ActionContext :
15- """Provides action handlers access to other subsystems."""
16+ @dataclass
17+ class DerefHandler :
18+ func : Callable [[HandlerContext , RID ], Bundle | None ]
19+ rid_types : tuple [RIDType ]
1620
17- identity : NodeIdentity
18-
19- def __init__ (
20- self ,
21- identity : NodeIdentity ,
22- ):
23- self .identity = identity
21+ def __call__ (self , ctx : HandlerContext , rid : RID ) -> Bundle | None :
22+ return self .func (ctx , rid )
2423
24+ @classmethod
25+ def create (cls , rid_types : tuple [RIDType ]):
26+ def decorator (func : Callable ) -> DerefHandler :
27+ handler = cls (func , rid_types )
28+ return handler
29+ return decorator
30+
2531
2632class BundleSource (StrEnum ):
2733 CACHE = "CACHE"
@@ -33,50 +39,23 @@ class Effector:
3339 cache : Cache
3440 resolver : NetworkResolver
3541 kobj_queue : KobjQueue
36- action_context : ActionContext
37- _action_table : dict [
38- type [RID ],
39- Callable [
40- [ActionContext , RID ],
41- Bundle | None
42- ]
43- ] = dict ()
42+ handler_context : HandlerContext
4443
4544 def __init__ (
4645 self ,
4746 cache : Cache ,
4847 resolver : NetworkResolver ,
4948 kobj_queue : KobjQueue ,
50- identity : NodeIdentity
49+ handler_context : HandlerContext ,
50+ deref_handlers : list [DerefHandler ]
5151 ):
5252 self .cache = cache
5353 self .resolver = resolver
5454 self .kobj_queue = kobj_queue
55- self .action_context = ActionContext (identity )
56- self ._action_table = self .__class__ ._action_table .copy ()
57-
58- @classmethod
59- def register_default_action (cls , rid_type : RIDType ):
60- def decorator (func : Callable ) -> Callable :
61- cls ._action_table [rid_type ] = func
62- return func
63- return decorator
64-
65- def register_action (self , rid_type : RIDType ):
66- """Registers a new dereference action for an RID type.
55+ self .handler_context = handler_context
56+ self .deref_handlers = deref_handlers
6757
68- Example:
69- This function should be used as a decorator on an action function::
70-
71- @node.register_action(KoiNetNode)
72- def deref_koi_net_node(ctx: ActionContext, rid: KoiNetNode):
73- # return a Bundle or None
74- return
75- """
76- def decorator (func : Callable ) -> Callable :
77- self ._action_table [rid_type ] = func
78- return func
79- return decorator
58+ self .handler_context .set_effector (self )
8059
8160 def _try_cache (self , rid : RID ) -> tuple [Bundle , BundleSource ] | None :
8261 bundle = self .cache .read (rid )
@@ -87,26 +66,27 @@ def _try_cache(self, rid: RID) -> tuple[Bundle, BundleSource] | None:
8766 else :
8867 log .debug ("Cache miss" )
8968 return None
90-
69+
9170 def _try_action (self , rid : RID ) -> tuple [Bundle , BundleSource ] | None :
92- if type (rid ) not in self ._action_table :
93- log .debug ("No action available" )
71+ action = None
72+ for handler in self .deref_handlers :
73+ if type (rid ) not in handler .rid_types :
74+ continue
75+ action = handler
76+ break
77+
78+ if not action :
79+ log .debug ("No action found" )
9480 return None
9581
96- log .debug ("Action available" )
97- func = self ._action_table [type (rid )]
98- bundle = func (
99- ctx = self .action_context ,
100- rid = rid
101- )
82+ bundle = action (ctx = self .handler_context , rid = rid )
10283
10384 if bundle :
10485 log .debug ("Action hit" )
10586 return bundle , BundleSource .ACTION
10687 else :
10788 log .debug ("Action miss" )
10889 return None
109-
11090
11191 def _try_network (self , rid : RID ) -> tuple [Bundle , KoiNetNode ] | None :
11292 bundle , source = self .resolver .fetch_remote_bundle (rid )
@@ -118,13 +98,13 @@ def _try_network(self, rid: RID) -> tuple[Bundle, KoiNetNode] | None:
11898 log .debug ("Network miss" )
11999 return None
120100
121-
122101 def deref (
123102 self ,
124103 rid : RID ,
125104 refresh_cache : bool = False ,
126105 use_network : bool = False ,
127- handle_result : bool = True
106+ handle_result : bool = True ,
107+ write_through : bool = False
128108 ) -> Bundle | None :
129109 """Dereferences an RID.
130110
@@ -136,7 +116,7 @@ def deref(
136116 rid: RID to dereference
137117 refresh_cache: skips cache read when `True`
138118 use_network: enables fetching from other nodes when `True`
139- handle_result: handles resulting bundle with knowledge pipeline when `True`
119+ handle_result: sends resulting bundle to kobj queue when `True`
140120 """
141121
142122 log .debug (f"Dereferencing { rid !r} " )
@@ -159,6 +139,9 @@ def deref(
159139 bundle = bundle ,
160140 source = source if type (source ) is KoiNetNode else None
161141 )
142+
143+ if write_through :
144+ self .kobj_queue .q .join ()
162145
163146 # TODO: refactor for general solution, param to write through to cache before continuing
164147 # like `self.processor.kobj_queue.join()``
0 commit comments