diff --git a/src/hio/help/__init__.py b/src/hio/help/__init__.py index 722769e2..6a3872ed 100644 --- a/src/hio/help/__init__.py +++ b/src/hio/help/__init__.py @@ -14,7 +14,7 @@ # initialize global ogler at hio.help.ogler always instantiated by default ogler = ogling.initOgler(prefix='hio') # init only runs once on import -from .decking import Deck +from .decking import Deck, TrackedDeck, CueBox from .hicting import Hict, Mict from .timing import Timer, MonoTimer, TimerError, RetroTimerError diff --git a/src/hio/help/decking.py b/src/hio/help/decking.py index 77148684..bb8b878e 100644 --- a/src/hio/help/decking.py +++ b/src/hio/help/decking.py @@ -1,13 +1,15 @@ # -*- encoding: utf-8 -*- """ -keri.help.decking module +hio.help.decking module -Support for Deck class +Support for Deck, TrackedDeck, and CueBox classes """ +import time from collections import deque from typing import Any + class Deck(deque): """ Extends deque to support deque access convenience methods .push and .pull @@ -93,3 +95,327 @@ def pull(self, emptive=False): if not emptive: raise return None + + +class TrackedDeck(Deck): + """ + Extends Deck with optional cap, lifecycle statistics, and back-pressure. + + Drop-in replacement for Deck. All Deck methods work unchanged. + Adds cap enforcement, push/pull/drop counters, and high water mark + tracking to diagnose unbounded growth and imbalanced producer/consumer + patterns. + + When cap is set and the deck is full, push() silently drops the + element and increments the drop counter. This prevents unbounded + memory growth in producer-faster-than-consumer scenarios. + + Parameters: + cap (int): optional maximum number of elements. None means unbounded. + Unlike deque's maxlen which discards from the opposite end, + TrackedDeck's cap drops the incoming element (back-pressure). + owner (str): optional identifier for this deck (for diagnostics) + + Local methods (in addition to Deck methods): + .stats = property returning dict of lifecycle statistics + .pushed = total successful pushes + .pulled = total successful pulls + .dropped = total elements dropped due to cap + .highwater = maximum observed deck depth + + Example: + + deck = TrackedDeck(cap=100, owner="witness_cues") + deck.push("event_1") + deck.push("event_2") + deck.pull() + assert deck.stats == { + "pending": 1, + "pushed": 2, + "pulled": 1, + "dropped": 0, + "highwater": 2, + "owner": "witness_cues", + } + + """ + + def __init__(self, iterable=(), cap=None, owner=""): + """ + Initialize TrackedDeck. + + Parameters: + iterable: initial elements (passed to deque) + cap (int): optional maximum size. None means unbounded. + owner (str): optional identifier for diagnostics + """ + super().__init__(iterable) + self._cap = cap + self._owner = owner + self._pushed = 0 + self._pulled = 0 + self._dropped = 0 + self._highwater = len(self) + + def __repr__(self): + """ + Custom repr for TrackedDeck + """ + itemreprs = repr(list(self)) + return ("TrackedDeck({0})".format(itemreprs)) + + def push(self, elem: Any): + """ + If not None, add elem to right side of deque, Otherwise ignore. + If cap is set and deck is at capacity, drop element and increment + drop counter instead of appending (back-pressure). + + Parameters: + elem (Any): element to be appended to deck (deque) + """ + if elem is None: + return + if self._cap is not None and len(self) >= self._cap: + self._dropped += 1 + return + super().push(elem) + self._pushed += 1 + if len(self) > self._highwater: + self._highwater = len(self) + + def pull(self, emptive=False): + """ + Remove and return elem from left side of deque. + Tracks pull count on successful removal. + + Parameters: + emptive (bool): True means return None instead of raise IndexError + """ + result = super().pull(emptive=emptive) + if result is not None: + self._pulled += 1 + return result + + @property + def cap(self): + """Maximum capacity or None if unbounded""" + return self._cap + + @property + def owner(self): + """Owner identifier for diagnostics""" + return self._owner + + @property + def pushed(self): + """Total successful pushes""" + return self._pushed + + @property + def pulled(self): + """Total successful pulls""" + return self._pulled + + @property + def dropped(self): + """Total elements dropped due to cap""" + return self._dropped + + @property + def highwater(self): + """Maximum observed deck depth""" + return self._highwater + + @property + def stats(self): + """ + Lifecycle statistics as a dict. + + Returns dict with keys: + pending: current number of elements + pushed: total successful pushes + pulled: total successful pulls + dropped: total elements dropped due to cap + highwater: maximum observed deck depth + owner: owner identifier + """ + return dict( + pending=len(self), + pushed=self._pushed, + pulled=self._pulled, + dropped=self._dropped, + highwater=self._highwater, + owner=self._owner, + ) + + +class CueBox: + """ + Two-phase cue lifecycle manager: claim -> resolve | reject. + + Prevents the re-push livelock pattern where unhandled cues bounce + between Doers indefinitely. A cue must be explicitly claimed before + processing and either resolved (handled) or rejected (returned to + pending with retry limit). + + Parameters: + cap (int): optional cap for the internal pending TrackedDeck + maxRetries (int): maximum times a cue can be rejected before + being permanently dropped. Default 3. + leakTimeout (float): seconds after which a claimed but unresolved + cue is considered leaked. Default 30.0. + + Local methods: + .push(cue) = add cue to pending queue + .claim(claimer) = remove from pending, mark as claimed. + Returns (cueId, cue) or None. + .resolve(cueId) = mark cue as handled (removes from claimed) + .reject(cueId) = return cue to pending (respects maxRetries) + .expireLeaked() = find and expire timed-out claimed cues + .stats = property returning lifecycle statistics + + Example: + + box = CueBox(cap=256, maxRetries=3) + box.push({"type": "receipt", "said": "EABC..."}) + + claim = box.claim("WitnessPublisher") + if claim is not None: + cueId, cue = claim + try: + handle(cue) + box.resolve(cueId) + except CannotHandle: + box.reject(cueId) # returns to pending if retries remain + """ + + def __init__(self, cap=None, maxRetries=3, leakTimeout=30.0): + """ + Initialize CueBox. + + Parameters: + cap (int): optional cap for pending queue + maxRetries (int): max reject cycles before permanent drop + leakTimeout (float): seconds before claimed cue is leaked + """ + self._pending = TrackedDeck(cap=cap, owner="CueBox.pending") + self._claimed = {} # cueId -> (cue, claimer, claimedAt, retries) + self._retryCounts = {} # id(cue) -> cumulative reject count + self._maxRetries = maxRetries + self._leakTimeout = leakTimeout + self._resolved = 0 + self._rejected = 0 + self._expired = 0 + + def push(self, cue: Any): + """ + Add a cue to the pending queue. + + Parameters: + cue: the cue element (must not be None) + """ + self._pending.push(cue) + + def claim(self, claimer=""): + """ + Remove next cue from pending and mark as claimed. + + Preserves cumulative retry count across reject/re-push cycles + so that maxRetries is enforced over the full lifecycle of a cue. + + Parameters: + claimer (str): identifier of the claiming Doer + + Returns: + tuple: (cueId, cue) if a cue was available, None otherwise + """ + cue = self._pending.pull(emptive=True) + if cue is None: + return None + cueId = id(cue) + retries = self._retryCounts.get(cueId, 0) + self._claimed[cueId] = (cue, claimer, time.monotonic(), retries) + return (cueId, cue) + + def resolve(self, cueId): + """ + Mark cue as successfully handled. Removes from claimed. + + Parameters: + cueId: the cue identifier returned by claim() + """ + entry = self._claimed.pop(cueId, None) + if entry is not None: + self._resolved += 1 + self._retryCounts.pop(cueId, None) + + def reject(self, cueId): + """ + Return cue to pending queue. Respects maxRetries to prevent + indefinite re-push livelock. + + Retry count is tracked cumulatively across reject/re-push/re-claim + cycles via identity tracking. If the cue has been rejected + maxRetries times, it is permanently dropped instead of returned + to pending. + + Parameters: + cueId: the cue identifier returned by claim() + """ + entry = self._claimed.pop(cueId, None) + if entry is None: + return + cue, claimer, claimedAt, retries = entry + self._rejected += 1 + retries += 1 + if retries < self._maxRetries: + self._retryCounts[id(cue)] = retries + self._pending.push(cue) + else: + self._retryCounts.pop(id(cue), None) + + def expireLeaked(self): + """ + Find and expire cues that were claimed but never resolved or + rejected within leakTimeout seconds. + + Returns: + list: list of (cueId, cue) tuples that were expired + """ + now = time.monotonic() + leaked = [] + for cueId in list(self._claimed): + cue, claimer, claimedAt, retries = self._claimed[cueId] + if now - claimedAt > self._leakTimeout: + self._claimed.pop(cueId) + self._expired += 1 + leaked.append((cueId, cue)) + return leaked + + @property + def stats(self): + """ + Lifecycle statistics as a dict. + + Returns dict with keys: + pending: current pending count + claimed: currently claimed count + resolved: total resolved count + rejected: total rejected count + expired: total expired (leaked) count + """ + return dict( + pending=len(self._pending), + claimed=len(self._claimed), + resolved=self._resolved, + rejected=self._rejected, + expired=self._expired, + ) + + def __len__(self): + """Total cues in flight (pending + claimed)""" + return len(self._pending) + len(self._claimed) + + def __bool__(self): + """True if any cues pending or claimed""" + return bool(self._pending) or bool(self._claimed) diff --git a/tests/help/test_decking.py b/tests/help/test_decking.py index ab37399c..77924bc9 100644 --- a/tests/help/test_decking.py +++ b/tests/help/test_decking.py @@ -3,9 +3,11 @@ tests.help.test_decking module """ +import time import pytest -from hio.help import Deck +from hio.help import Deck, TrackedDeck, CueBox + def test_deck(): """ @@ -83,5 +85,346 @@ def test_deck(): """End Test""" +def test_tracked_deck_basic(): + """ + Test TrackedDeck basic push/pull behavior (backward compat with Deck) + """ + deck = TrackedDeck() + assert len(deck) == 0 + assert not deck + + # push/pull same as Deck + deck.push("A") + deck.push("B") + assert deck.pull() == "A" + assert deck.pull(emptive=True) == "B" + assert not deck + + # None filtered + deck.push(None) + assert not deck + + # emptive pull on empty + assert deck.pull(emptive=True) is None + + # non-emptive pull raises + with pytest.raises(IndexError): + deck.pull() + + # falsy values allowed + deck.extend([False, "", []]) + stuff = [] + while (x := deck.pull(emptive=True)) is not None: + stuff.append(x) + assert stuff == [False, "", []] + assert not deck + + # repr + deck.push("X") + assert "TrackedDeck" in repr(deck) + + """End Test""" + + +def test_tracked_deck_cap(): + """ + Test TrackedDeck cap enforcement (back-pressure) + """ + deck = TrackedDeck(cap=3, owner="test") + assert deck.cap == 3 + assert deck.owner == "test" + + # Fill to cap + deck.push("A") + deck.push("B") + deck.push("C") + assert len(deck) == 3 + assert deck.pushed == 3 + assert deck.dropped == 0 + + # Exceed cap - element is dropped + deck.push("D") + assert len(deck) == 3 # still 3 + assert deck.dropped == 1 + assert deck.pushed == 3 # D was not pushed + + # More drops + deck.push("E") + deck.push("F") + assert deck.dropped == 3 + + # Pull makes room + assert deck.pull() == "A" + deck.push("G") + assert len(deck) == 3 + assert deck.pushed == 4 # G was pushed + assert deck.dropped == 3 + + """End Test""" + + +def test_tracked_deck_stats(): + """ + Test TrackedDeck lifecycle statistics + """ + deck = TrackedDeck(cap=5, owner="witness_cues") + + for i in range(8): + deck.push(i) + + assert deck.pushed == 5 + assert deck.dropped == 3 + assert deck.highwater == 5 + + deck.pull() + deck.pull() + assert deck.pulled == 2 + + s = deck.stats + assert s["pending"] == 3 + assert s["pushed"] == 5 + assert s["pulled"] == 2 + assert s["dropped"] == 3 + assert s["highwater"] == 5 + assert s["owner"] == "witness_cues" + + """End Test""" + + +def test_tracked_deck_unbounded(): + """ + Test TrackedDeck with no cap (unbounded, same behavior as Deck) + """ + deck = TrackedDeck() + assert deck.cap is None + + # Push many elements - no drops + for i in range(1000): + deck.push(i) + + assert len(deck) == 1000 + assert deck.pushed == 1000 + assert deck.dropped == 0 + assert deck.highwater == 1000 + + """End Test""" + + +def test_tracked_deck_initial_elements(): + """ + Test TrackedDeck with initial iterable + """ + deck = TrackedDeck(["A", "B", "C"]) + assert len(deck) == 3 + assert deck.highwater == 3 + assert deck.pushed == 0 # initial elements don't count as pushes + assert deck.pull() == "A" + assert deck.pulled == 1 + + """End Test""" + + +def test_tracked_deck_deque_compat(): + """ + Test TrackedDeck inherits all deque methods + """ + deck = TrackedDeck() + deck.append("x") # direct deque method + deck.push("y") # Deck method + assert len(deck) == 2 + assert deck.popleft() == "x" # direct deque method + assert deck.pull() == "y" # Deck method + + deck.extend(["A", "B", "C"]) + assert "B" in deck + deck.clear() + assert not deck + + """End Test""" + + +def test_cuebox_claim_resolve(): + """ + Test CueBox claim and resolve lifecycle + """ + box = CueBox() + + # Empty claim returns None + assert box.claim("doer_a") is None + + # Push and claim + box.push("cue1") + box.push("cue2") + assert len(box) == 2 + + claim = box.claim("doer_a") + assert claim is not None + cueId, cue = claim + assert cue == "cue1" + assert box.stats["claimed"] == 1 + assert box.stats["pending"] == 1 + + # Resolve + box.resolve(cueId) + assert box.stats["resolved"] == 1 + assert box.stats["claimed"] == 0 + + # Resolve unknown id is no-op + box.resolve(99999) + assert box.stats["resolved"] == 1 + + """End Test""" + + +def test_cuebox_claim_reject(): + """ + Test CueBox reject returns cue to pending + """ + box = CueBox(maxRetries=2) + + box.push("cue1") + claim = box.claim("doer_a") + cueId, cue = claim + assert cue == "cue1" + + # Reject returns to pending + box.reject(cueId) + assert box.stats["rejected"] == 1 + assert box.stats["pending"] == 1 + assert box.stats["claimed"] == 0 + + # Can re-claim + claim2 = box.claim("doer_b") + assert claim2 is not None + assert claim2[1] == "cue1" + + """End Test""" + + +def test_cuebox_max_retries(): + """ + Test CueBox maxRetries prevents indefinite re-push livelock + """ + box = CueBox(maxRetries=2) + + box.push("stubborn_cue") + + # First reject — retries becomes 1, still < 2 so re-pushed + claim = box.claim("doer") + assert claim is not None + box.reject(claim[0]) + assert box.stats["pending"] == 1 # back in pending + + # Second reject — retries becomes 2, equals maxRetries so dropped + claim = box.claim("doer") + assert claim is not None + box.reject(claim[0]) + assert box.stats["pending"] == 0 # permanently dropped + + # Nothing left to claim + claim = box.claim("doer") + assert claim is None + + # Verify the livelock pattern terminates + box2 = CueBox(maxRetries=3) + box2.push("livelock_cue") + cycles = 0 + while box2: + claim = box2.claim("doer") + if claim is None: + break + box2.reject(claim[0]) + cycles += 1 + if cycles > 100: + break # safety + assert cycles == 3 # exactly maxRetries rejects then dropped + assert not box2 # empty + + """End Test""" + + +def test_cuebox_leak_detection(): + """ + Test CueBox expireLeaked detects abandoned claims + """ + box = CueBox(leakTimeout=0.01) # 10ms for testing + + box.push("leaky") + claim = box.claim("forgetful_doer") + assert claim is not None + assert len(box) == 1 # 1 claimed + + time.sleep(0.02) # exceed timeout + + leaked = box.expireLeaked() + assert len(leaked) == 1 + assert leaked[0][1] == "leaky" + assert box.stats["expired"] == 1 + assert box.stats["claimed"] == 0 + assert len(box) == 0 + + """End Test""" + + +def test_cuebox_stats(): + """ + Test CueBox stats reporting + """ + box = CueBox(cap=10) + + box.push("a") + box.push("b") + box.push("c") + + claim = box.claim("x") + box.resolve(claim[0]) + + claim = box.claim("y") + box.reject(claim[0]) + + s = box.stats + assert s["pending"] == 2 # c + rejected b + assert s["claimed"] == 0 + assert s["resolved"] == 1 + assert s["rejected"] == 1 + + """End Test""" + + +def test_cuebox_bool_len(): + """ + Test CueBox __bool__ and __len__ + """ + box = CueBox() + assert not box + assert len(box) == 0 + + box.push("x") + assert box + assert len(box) == 1 + + claim = box.claim("d") + assert box # claimed counts + assert len(box) == 1 + + box.resolve(claim[0]) + assert not box + assert len(box) == 0 + + """End Test""" + + if __name__ == "__main__": test_deck() + test_tracked_deck_basic() + test_tracked_deck_cap() + test_tracked_deck_stats() + test_tracked_deck_unbounded() + test_tracked_deck_initial_elements() + test_tracked_deck_deque_compat() + test_cuebox_claim_resolve() + test_cuebox_claim_reject() + test_cuebox_max_retries() + test_cuebox_leak_detection() + test_cuebox_stats() + test_cuebox_bool_len()