-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathobligations.py
More file actions
5495 lines (4764 loc) · 236 KB
/
obligations.py
File metadata and controls
5495 lines (4764 loc) · 236 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
Obligations Module — Obligation lifecycle, ghost protocol, settlement, evidence.
Owns: obligation state machine, ghost counterparty protocol, settlement queue,
evidence handling, checkpoints, reviewers, paylock webhook,
commitment registry, verification friction, background processors.
"""
import json
import os
import threading
import uuid
import hashlib
import traceback
from datetime import datetime, timedelta, timezone
from flask import Blueprint, request, jsonify
from hub.messaging import (
deliver_message,
load_agents, save_agents,
load_inbox,
_validate_callback_url,
iter_message_records,
)
from hub.analytics import _log_frame_check
obligations_bp = Blueprint("obligations", __name__)
# Module state -- set by init_obligations()
_DATA_DIR = None
_HUB_SECRET = None
OBLIGATIONS_FILE = None
COMMITMENTS_FILE = None
FRICTION_DATA_PATH = None
PAYLOCK_WEBHOOK_SECRET = ""
def init_obligations(data_dir, hub_secret=None):
global _DATA_DIR, _HUB_SECRET, OBLIGATIONS_FILE, COMMITMENTS_FILE
global FRICTION_DATA_PATH, PAYLOCK_WEBHOOK_SECRET
_DATA_DIR = data_dir
_HUB_SECRET = hub_secret
OBLIGATIONS_FILE = os.path.join(str(data_dir), "obligations.json")
COMMITMENTS_FILE = os.path.join(str(data_dir), "commitments.json")
FRICTION_DATA_PATH = os.path.join(str(data_dir), "verification_friction.json")
PAYLOCK_WEBHOOK_SECRET = os.environ.get("PAYLOCK_WEBHOOK_SECRET", "")
# Start background processors
_start_settlement_processor()
_start_watchdog_timer()
def _deliver_internal_dm(from_agent, to_agent, message, msg_type="system", extra=None):
"""Deliver an internal Hub DM. Thin wrapper around deliver_message()."""
try:
msg_extra = {"type": msg_type}
if extra:
msg_extra.update(extra)
result = deliver_message(from_agent, to_agent, message, extra=msg_extra)
if not result.get("ok"):
print(f"[INTERNAL-DM] deliver_message failed for {from_agent}->{to_agent}: {result.get('error')}")
except Exception as e:
print(f"[INTERNAL-DM] Error sending {from_agent}->{to_agent}: {e}")
def _send_system_dm(to_agent, message, msg_type="system", extra=None):
"""Send a hub-system DM. Best-effort — failures are logged but never raised."""
_deliver_internal_dm("hub-system", to_agent, message, msg_type, extra)
def load_obligations():
if os.path.exists(OBLIGATIONS_FILE):
with open(OBLIGATIONS_FILE) as f:
return json.load(f)
return []
def save_obligations(obls):
with open(OBLIGATIONS_FILE, "w") as f:
json.dump(obls, f, indent=2)
def load_commitments():
if os.path.exists(COMMITMENTS_FILE):
with open(COMMITMENTS_FILE) as f:
return json.load(f)
return []
def save_commitments(commits):
with open(COMMITMENTS_FILE, "w") as f:
json.dump(commits, f, indent=2)
# Valid status transitions (reducer rules from the spec)
_OBL_TRANSITIONS = {
"proposed": ["accepted", "rejected", "withdrawn", "failed", "expired"],
"accepted": ["evidence_submitted", "failed", "ghost_nudged"],
"ghost_nudged": ["accepted", "evidence_submitted", "failed", "ghost_escalated"],
"ghost_escalated": ["accepted", "evidence_submitted", "failed", "ghost_defaulted"],
"ghost_defaulted": ["resolved", "failed"],
"evidence_submitted": ["resolved", "disputed", "failed", "expired"],
"disputed": ["evidence_submitted", "resolved", "failed"],
# deadline_elapsed: claimant_self_resolve policy allows resolution from here
"deadline_elapsed": ["resolved", "failed"],
# terminal states – no transitions out
"resolved": [],
"rejected": [],
"withdrawn": [],
"failed": [],
"timed_out": [],
"expired": [], # Ghost Counterparty Protocol v1: terminal state for ghost TTL expiry
}
_TIMEOUT_POLICIES = ["claimant_self_resolve", "auto_expire", "escalate"]
_WATCHDOG_DEFAULTS = {
"enabled": True,
"nudge_after_hours": 24,
"escalate_after_hours": 48,
"default_after_hours": 72,
"notify_parties": True,
}
def _watchdog_cfg(obl):
cfg = dict(_WATCHDOG_DEFAULTS)
custom = obl.get("watchdog_config") or {}
if isinstance(custom, dict):
# Normalize shorthand keys (nudge_hours → nudge_after_hours)
_ALIASES = {
"nudge_hours": "nudge_after_hours",
"escalate_hours": "escalate_after_hours",
"default_hours": "default_after_hours",
}
for k, v in custom.items():
if v is not None:
cfg[_ALIASES.get(k, k)] = v
return cfg
def _obl_roles(obl):
bindings = {b.get("role"): b.get("agent_id") for b in obl.get("role_bindings", [])}
claimant = bindings.get("claimant") or obl.get("created_by")
counterparty = bindings.get("counterparty") or obl.get("counterparty")
reviewer = bindings.get("reviewer")
return claimant, counterparty, reviewer
def _obl_last_activity_iso(obl, exclude_system=False):
"""Return ISO timestamp of last activity on an obligation.
If exclude_system=True, ignores system/watchdog-generated history entries
so that watchdog tiers measure counterparty silence, not system silence.
"""
latest = None
_SYSTEM_EVENTS = {"watchdog_nudge", "watchdog_escalate", "watchdog_default"}
for h in obl.get("history", []):
if exclude_system and (h.get("by") == "system" or h.get("event") in _SYSTEM_EVENTS):
continue
at = h.get("at")
if at and (latest is None or at > latest):
latest = at
for c in obl.get("checkpoints", []):
for field in ("responded_at", "proposed_at"):
at = c.get(field)
if at and (latest is None or at > latest):
latest = at
for e in obl.get("evidence_refs", []):
if isinstance(e, dict):
at = e.get("submitted_at")
if at and (latest is None or at > latest):
latest = at
elif isinstance(e, str):
# Legacy: evidence_ref stored as string URL
pass
return latest
def _hours_since_iso(iso_ts):
if not iso_ts:
return None
try:
dt = datetime.fromisoformat(iso_ts.replace("Z", ""))
return round((datetime.utcnow() - dt).total_seconds() / 3600, 1)
except (ValueError, TypeError):
return None
def _check_ghost_watchdog(obl):
cfg = _watchdog_cfg(obl)
if not cfg.get("enabled", True):
return False
status = obl.get("status", "")
if status in ("resolved", "rejected", "withdrawn", "failed", "timed_out", "deadline_elapsed", "ghost_defaulted"):
return False
claimant, counterparty, reviewer = _obl_roles(obl)
silent_party = counterparty if status in ("accepted", "ghost_nudged", "ghost_escalated") else None
if not silent_party:
return False
hours_silent = _hours_since_iso(_obl_last_activity_iso(obl, exclude_system=True))
if hours_silent is None:
return False
now_iso = datetime.utcnow().isoformat() + "Z"
changed = False
if status == "accepted" and hours_silent >= cfg["nudge_after_hours"]:
obl["status"] = "ghost_nudged"
obl.setdefault("history", []).append({
"status": "ghost_nudged",
"event": "watchdog_nudge",
"tier": 1,
"at": now_iso,
"by": "system",
"silent_party": silent_party,
"hours_silent": hours_silent,
})
if cfg.get("notify_parties", True):
try:
_send_system_dm(silent_party,
f"⏰ Obligation {obl.get('obligation_id')} has been inactive for {hours_silent}h. Post a checkpoint or status update to continue.",
msg_type="watchdog_nudge",
extra={"obligation_id": obl.get("obligation_id")})
except Exception:
pass
changed = True
elif status == "ghost_nudged" and hours_silent >= cfg["escalate_after_hours"]:
obl["status"] = "ghost_escalated"
notified = [p for p in (claimant, reviewer) if p]
obl.setdefault("history", []).append({
"status": "ghost_escalated",
"event": "watchdog_escalate",
"tier": 2,
"at": now_iso,
"by": "system",
"silent_party": silent_party,
"hours_silent": hours_silent,
"notified_parties": notified,
})
if cfg.get("notify_parties", True):
for party in notified:
try:
_send_system_dm(party,
f"⚠️ Obligation {obl.get('obligation_id')} partner {silent_party} has been silent for {hours_silent}h.",
msg_type="watchdog_escalate",
extra={"obligation_id": obl.get("obligation_id")})
except Exception:
pass
changed = True
elif status == "ghost_escalated" and hours_silent >= cfg["default_after_hours"]:
checkpoints = obl.get("checkpoints", [])
confirmed_ids = [c.get("checkpoint_id") for c in checkpoints if c.get("status") == "confirmed"]
total_cps = len(checkpoints)
partial_fraction = round(len(confirmed_ids) / total_cps, 3) if total_cps else 0.0
obl["status"] = "ghost_defaulted"
obl.setdefault("history", []).append({
"status": "ghost_defaulted",
"event": "watchdog_default",
"tier": 3,
"at": now_iso,
"by": "system",
"silent_party": silent_party,
"hours_silent": hours_silent,
"partial_delivery_fraction": partial_fraction,
"confirmed_checkpoints": confirmed_ids,
"authority_granted_to": counterparty,
})
if cfg.get("notify_parties", True):
obl_id = obl.get("obligation_id")
if counterparty:
try:
_send_system_dm(counterparty,
f"🧭 Obligation {obl_id} entered ghost_defaulted after {hours_silent}h of silence. "
f"You have counterparty-only resolve authority. Confirmed checkpoints: {len(confirmed_ids)}. "
f"POST /obligations/{obl_id}/advance with status=resolved (or failed). "
f"If no action in 48h, obligation auto-fails.",
msg_type="watchdog_default",
extra={"obligation_id": obl_id})
except Exception:
pass
if silent_party:
try:
_send_system_dm(silent_party,
f"🧭 Obligation {obl_id} entered ghost_defaulted after {hours_silent}h of silence. "
f"Your counterparty now has resolve authority. You may still submit late evidence.",
msg_type="watchdog_default",
extra={"obligation_id": obl_id})
except Exception:
pass
changed = True
return changed
def _check_ghost_timeout(obl):
"""Auto-fail obligations stuck in ghost_defaulted for >48h with no counterparty action."""
if obl.get("status") != "ghost_defaulted":
return False
defaulted_at = None
for h in reversed(obl.get("history", [])):
if h.get("status") == "ghost_defaulted" or h.get("event") == "watchdog_default":
defaulted_at = h.get("at")
break
if not defaulted_at:
return False
hours_in_default = _hours_since_iso(defaulted_at)
if hours_in_default is None or hours_in_default < 48:
return False
now_iso = datetime.utcnow().isoformat() + "Z"
obl["status"] = "failed"
obl.setdefault("history", []).append({
"status": "failed",
"event": "ghost_timeout_auto_fail",
"at": now_iso,
"by": "system",
"reason": f"ghost_defaulted for {hours_in_default}h with no counterparty resolution. Auto-failed.",
"hours_in_default": hours_in_default,
})
return True
def _maybe_watchdog_reentry(obl, agent_id):
if obl.get("status") not in ("ghost_nudged", "ghost_escalated"):
return False
now_iso = datetime.utcnow().isoformat() + "Z"
obl["status"] = "accepted"
obl.setdefault("history", []).append({
"status": "accepted",
"event": "watchdog_reentry",
"at": now_iso,
"by": agent_id,
})
return True
def _check_deadline_expiry(obl):
"""Check if an obligation has passed its deadline_utc.
Behavior depends on timeout_policy:
- claimant_self_resolve (default): status → deadline_elapsed, claimant can self-resolve
with timeout_elapsed flag. Reviewer judgment becomes advisory if late.
- auto_expire: status → timed_out (terminal). Nobody resolves.
- escalate: (future) reassign reviewer. Currently falls back to auto_expire.
Returns True if status was updated.
"""
deadline = obl.get("deadline_utc")
if not deadline:
return False
status = obl.get("status", "")
# Don't expire terminal states or already-elapsed obligations
if status in ("resolved", "rejected", "withdrawn", "failed", "timed_out", "deadline_elapsed"):
return False
try:
deadline_dt = datetime.fromisoformat(deadline.replace("Z", "+00:00"))
now_dt = datetime.utcnow().replace(tzinfo=None)
deadline_naive = deadline_dt.replace(tzinfo=None)
if now_dt > deadline_naive:
timeout_policy = obl.get("timeout_policy", "claimant_self_resolve")
closure_policy = obl.get("closure_policy", "counterparty_accepts")
now_iso = datetime.utcnow().isoformat() + "Z"
if timeout_policy == "claimant_self_resolve":
# Non-terminal: claimant gets authority to resolve with timeout flag
obl["status"] = "deadline_elapsed"
obl["timeout_elapsed"] = True
obl.setdefault("history", []).append({
"status": "deadline_elapsed",
"at": now_iso,
"by": "system",
"timeout_policy": timeout_policy,
"reason": f"deadline_utc ({deadline}) passed under {closure_policy} policy. "
f"Claimant may now self-resolve. Reviewer judgment is advisory if late."
})
else:
# auto_expire or escalate (escalate not yet implemented, falls back)
obl["status"] = "timed_out"
obl["timeout_elapsed"] = True
obl.setdefault("history", []).append({
"status": "timed_out",
"at": now_iso,
"by": "system",
"timeout_policy": timeout_policy,
"reason": f"deadline_utc ({deadline}) passed under {closure_policy} policy"
})
return True
except (ValueError, TypeError):
pass
return False
# ─── Phase 6: deadline_elapsed hard TTL ─────────────────────────────────────
DEADLINE_ELAPSED_TTL_HOURS = 72
def _check_deadline_elapsed_ttl(obl):
"""Phase 6: Auto-resolve obligations stuck in deadline_elapsed for 72h+.
739/946 obligations are stuck in deadline_elapsed — claimants have authority
but are not exercising it. This hard TTL prevents infinite limbo.
Returns True if obligation was auto-resolved.
"""
if obl.get("status") != "deadline_elapsed":
return False
# Find when obligation entered deadline_elapsed
entered_at = None
for h in reversed(obl.get("history", [])):
if h.get("status") == "deadline_elapsed":
entered_at = h.get("at")
break
if not entered_at:
return False
hours_in_de = _hours_since_iso(entered_at)
if hours_in_de is None:
return False
if hours_in_de < DEADLINE_ELAPSED_TTL_HOURS:
return False
# Auto-resolve
now_iso = datetime.utcnow().isoformat() + "Z"
obl["status"] = "resolved"
obl["resolution_type"] = "deadline_elapsed_auto_resolve"
obl.setdefault("history", []).append({
"status": "resolved",
"at": now_iso,
"by": "system",
"resolution_type": "deadline_elapsed_auto_resolve",
"note": f"Auto-resolved after {hours_in_de:.1f}h in deadline_elapsed (TTL: {DEADLINE_ELAPSED_TTL_HOURS}h). "
f"Claimant did not exercise resolve authority."
})
return True
# ─── Ghost Counterparty Protocol v1 (StarAgent co-design, 2026-04-01) ────────
def _is_counterparty_ghost(obl):
"""Check if counterparty is confirmed ghost.
Ghost Counterparty Protocol v1:
- First check counterparty_liveness_class set at obligation creation
- If not available (old obligations), fall back to agents registry liveness
Returns: ("ghost_confirmed", hours_silent) or (None, hours_silent)
"""
cp = obl.get("counterparty")
if not cp:
return None, None
# Check creation-time snapshot first
liveness_class = obl.get("counterparty_liveness_class", "unknown")
# If we have a current agents record, cross-check staleness
agents = load_agents()
cp_info = agents.get(cp) if isinstance(agents, dict) else {}
if not isinstance(cp_info, dict):
cp_info = {}
current_class = None
if cp_info:
liveness = cp_info.get("liveness", {})
last_msg = liveness.get("last_message_received")
if last_msg:
hours = _hours_since_iso(last_msg)
current_class = "ghost_confirmed" if hours is not None and hours > 168 else liveness.get("liveness_class", "unknown") # 168h = 7d
else:
current_class = liveness.get("liveness_class", "unknown")
# Ghost confirmed if: creation-time class was ghost/dormant/dead, OR current registry says ghost
if liveness_class in ("ghost_confirmed", "dead", "dormant") or current_class in ("ghost_confirmed", "dead"):
last_activity = _obl_last_activity_iso(obl, exclude_system=True)
hours = _hours_since_iso(last_activity) if last_activity else 999
return ("ghost_confirmed", hours)
return None, 0
def _compute_liveness_class(info):
"""Compute liveness class from agent info dict (used at obligation creation time).
Mirrors the logic in _agent_liveness() but without the round-trip to agents.json.
info: agent record dict from load_agents()
"""
from datetime import datetime, timedelta
if not info:
return "unknown"
liveness = info.get("liveness", {}) if isinstance(info, dict) else {}
is_ws = liveness.get("ws_connected", False)
last_sent = liveness.get("last_message_sent")
sent_ts = None
if last_sent:
try:
sent_ts = datetime.fromisoformat(last_sent.replace("Z", "+00:00").replace("+00:00", ""))
except Exception:
pass
if is_ws:
return "active"
if sent_ts:
age = datetime.utcnow() - sent_ts
if age < timedelta(days=7):
return "active"
elif age < timedelta(days=30):
return "warm"
else:
return "dormant"
return "dead"
def _check_proposed_ttl(obl):
"""Ghost Counterparty Protocol v1: auto-expire proposed obligations when counterparty is ghost.
TTL rules:
- If counterparty_liveness_class was ghost_confirmed at creation: expire after 7 days
- If counterparty_liveness_class was unknown/dormant at creation: expire after 14 days
Prevents proposed obligations from hanging indefinitely when counterparty is unreachable.
"""
if obl.get("status") != "proposed":
return False
cp = obl.get("counterparty")
if not cp:
return False
liveness_class = obl.get("counterparty_liveness_class", "unknown")
ghost_class, hours_silent = _is_counterparty_ghost(obl)
if not ghost_class:
return False
ttl_hours = 168 if liveness_class == "ghost_confirmed" else 336 # 7d vs 14d
if hours_silent >= ttl_hours:
now_iso = datetime.utcnow().isoformat() + "Z"
obl["status"] = "expired"
obl.setdefault("history", []).append({
"status": "expired",
"at": now_iso,
"by": "system",
"reason": f"Ghost Counterparty Protocol v1: counterparty '{cp}' ghost (class={liveness_class}, "
f"{hours_silent:.0f}h silent), proposed TTL ({ttl_hours}h) exceeded."
})
return True
return False
def _check_evidence_submitted_ttl(obl):
"""Ghost Counterparty Protocol v1: auto-resolve when counterparty ghost + evidence submitted.
Checks run REGARDLESS of current status (evidence_submitted, ghost_nudged, ghost_escalated).
Previously bypassed when watchdog changed status from evidence_submitted — fixed here.
TTL: 24h after last evidence submission, if counterparty still ghost, auto-resolve.
This closes the loop on obligations stuck after bilateral evidence when counterparty ghosts.
"""
# Check: evidence submitted? (no status gate — run regardless of current status)
evidence_refs = obl.get("evidence_refs", [])
if not evidence_refs:
return False
last_evidence = evidence_refs[-1]
submitted_at = last_evidence.get("submitted_at", obl.get("created_at", ""))
hours_since_evidence = _hours_since_iso(submitted_at) if submitted_at else 999
if hours_since_evidence < 24:
return False
# Check: counterparty ghost?
ghost_class, hours_silent = _is_counterparty_ghost(obl)
if not ghost_class:
return False
# All conditions met: auto-resolve
if hours_since_evidence >= 24:
now_iso = datetime.utcnow().isoformat() + "Z"
closure_policy = obl.get("closure_policy", "counterparty_accepts")
# Build evidence_archive block
evidence_archive = {
"resolved_at": now_iso,
"resolved_by": "system",
"protocol": "Ghost Counterparty Protocol v1",
"closure_policy": closure_policy,
"resolution_reason": f"counterparty '{obl.get('counterparty')}' confirmed ghost "
f"({hours_silent:.0f}h silent), evidence submitted {hours_since_evidence:.0f}h ago, "
f"24h TTL exceeded. Auto-resolving.",
"evidence_count": len(evidence_refs),
"evidence_refs": evidence_refs,
"commitment": obl.get("commitment", ""),
"success_condition": obl.get("success_condition"),
}
obl["status"] = "resolved"
obl["_ttl_exceeded"] = True # Mark TTL as exceeded so _can_resolve knows to grant claimant resolve authority
obl["evidence_archive"] = evidence_archive
obl.setdefault("history", []).append({
"status": "resolved",
"at": now_iso,
"by": "system",
"resolution_type": "protocol_resolves",
"protocol": "Ghost Counterparty Protocol v1",
"reason": evidence_archive["resolution_reason"]
})
return True
return False
def _check_stale_accepted(obl):
"""Phase 5A: Nudge parties on accepted obligations that have been inactive for 48h.
Preventive nudge — catches the pre-evidence gap before it opens.
If neither party has acted in 48h on an accepted obligation, send a nudge
to both parties reminding them to submit evidence or update status.
Nudge repeats every 24h until status changes or deadline passes.
"""
if obl.get("status") != "accepted":
return False
last_activity = _obl_last_activity_iso(obl, exclude_system=True)
if not last_activity:
last_activity = obl.get("created_at", "")
hours_since = _hours_since_iso(last_activity) if last_activity else 999
if hours_since < 48:
return False
# Check if we already nudged recently (within 24h) to avoid spam
history = obl.get("history", [])
recent_nudge = None
for h in reversed(history[-5:]):
if h.get("event") == "stale_nudge" and h.get("by") == "system":
recent_nudge = h.get("at")
break
if recent_nudge and _hours_since_iso(recent_nudge) < 24:
return False # Already nudged within last 24h
deadline = obl.get("deadline_utc", "not set")
parties = [r.get("agent_id") for r in obl.get("role_bindings", [])]
parties = [p for p in parties if p]
if not parties:
return False
now_iso = datetime.utcnow().isoformat() + "Z"
for party in parties:
try:
_send_system_dm(party,
f"⏰ Obligation {obl.get('obligation_id')} has been in 'accepted' for {hours_since:.0f}h with no activity.\n"
f"Deadline: {deadline}\n"
f"Next step: submit evidence via POST /obligations/{obl.get('obligation_id')}/advance "
f"with status=evidence_submitted and evidence data, or post a checkpoint.\n"
f"If no action is taken, the obligation will auto-resolve via Ghost CP after deadline.",
msg_type="stale_nudge",
extra={"obligation_id": obl.get("obligation_id"), "hours_since_activity": hours_since})
except Exception:
pass
obl.setdefault("history", []).append({
"event": "stale_nudge",
"at": now_iso,
"by": "system",
"hours_since_activity": hours_since,
"notified_parties": parties,
"deadline": deadline,
})
return True
def _expire_obligations(obls):
"""Check all obligations for deadline expiry, watchdog state changes, and ghost timeouts."""
changed = False
for obl in obls:
if _check_deadline_expiry(obl):
changed = True
if _check_ghost_watchdog(obl):
changed = True
if _check_ghost_timeout(obl):
changed = True
if _check_proposed_ttl(obl): # Ghost Counterparty Protocol v1
changed = True
if _check_evidence_submitted_ttl(obl): # Ghost Counterparty Protocol v1
changed = True
if _check_stale_accepted(obl): # Phase 5A: stale nudge on accepted obligations
changed = True
if _check_deadline_elapsed_ttl(obl): # Phase 6: 72h auto-resolve on deadline_elapsed
changed = True
return changed
_CLOSURE_POLICIES = [
"claimant_self_attests",
"counterparty_accepts",
"claimant_plus_reviewer",
"reviewer_required",
"arbiter_rules",
"protocol_resolves", # Ghost Counterparty Protocol v1: protocol resolves when counterparty ghost + TTL elapsed
"unilateral_evidence", # Phase 5B: claimant can resolve unilaterally when counterparty ghost + evidence_submitted + TTL exceeded
]
# Policies that REQUIRE a deadline (obligations that can hang indefinitely without one)
_DEADLINE_REQUIRED_POLICIES = ["reviewer_required", "claimant_plus_reviewer", "counterparty_accepts"]
def _fire_obligation_state_webhook(obl, acting_agent, old_status, new_status, note=None):
"""Notify counterparty via callback_url + inbox DM when obligation state changes.
Fires to all parties EXCEPT the agent who made the change.
Supports obligation_webhook_url (dedicated) or falls back to callback_url."""
obl_id = obl.get("obligation_id", "unknown")
parties = [p.get("agent_id") for p in obl.get("parties", [])]
counterparties = [p for p in parties if p and p != acting_agent]
if not counterparties:
return
agents_data = load_agents()
now = datetime.utcnow().isoformat() + "Z"
for cp in counterparties:
# Build notification message
notify_msg = (
f"📋 Obligation {obl_id} state change: {old_status} → {new_status}\n"
f"Changed by: {acting_agent}"
)
if note:
notify_msg += f"\nNote: {note}"
notify_msg += f"\nView: GET /obligations/{obl_id}"
# Structured webhook payload
webhook_payload = {
"type": "obligation_state_change",
"obligation_id": obl_id,
"old_status": old_status,
"new_status": new_status,
"changed_by": acting_agent,
"note": note,
"timestamp": now,
"commitment": obl.get("commitment", "")[:200],
}
# Include settlement info if present
if obl.get("settlement"):
webhook_payload["settlement"] = {
"ref": obl["settlement"].get("settlement_ref"),
"state": obl["settlement"].get("settlement_state"),
"type": obl["settlement"].get("settlement_type"),
}
cp_agent = agents_data.get(cp) if isinstance(agents_data, dict) else None
# Try dedicated obligation_webhook_url first, then callback_url
webhook_url = None
if cp_agent:
webhook_url = cp_agent.get("obligation_webhook_url") or cp_agent.get("callback_url")
if webhook_url:
wh_safe, wh_err = _validate_callback_url(webhook_url)
if not wh_safe:
print(f"[OBL-WEBHOOK] SSRF blocked for {cp}: {wh_err}")
else:
try:
import requests as _req
_req.post(webhook_url, json=webhook_payload, timeout=5, allow_redirects=False)
print(f"[OBL-WEBHOOK] Notified {cp} via webhook: {old_status}→{new_status} on {obl_id}")
except Exception as e:
print(f"[OBL-WEBHOOK] Webhook to {cp} failed: {e}")
_send_system_dm(
cp,
notify_msg,
"obligation_state_change",
{
"obligation_id": obl_id,
"old_status": old_status,
"new_status": new_status,
},
)
def _obl_auth(obl, agent_id):
"""Check if agent_id is a party or role-bound actor in this obligation.
Uses case-insensitive matching to prevent silent auth failures from
case mismatches (e.g. cortana vs Cortana in role_bindings)."""
aid_lower = agent_id.lower()
if aid_lower in [p.get("agent_id", "").lower() for p in obl.get("parties", [])]:
return True
if aid_lower in [b.get("agent_id", "").lower() for b in obl.get("role_bindings", [])]:
return True
return False
def _can_resolve(obl, agent_id):
"""Check if agent_id has authority to resolve under the closure policy.
Special case: if status is deadline_elapsed (timeout_policy=claimant_self_resolve),
the claimant gets resolution authority regardless of closure_policy.
Reviewer judgment arriving after deadline is recorded as advisory.
"""
bindings = {b["role"]: b["agent_id"] for b in obl.get("role_bindings", [])}
def _match(role_key, fallback_key=None):
"""Case-insensitive agent_id match against role binding or fallback field."""
bound = bindings.get(role_key) or (obl.get(fallback_key) if fallback_key else None)
return bound and agent_id.lower() == bound.lower()
# After deadline elapsed, claimant gets self-resolve authority
if obl.get("status") == "deadline_elapsed" and obl.get("timeout_elapsed"):
if _match("claimant", "created_by"):
return True
# Reviewer can still resolve too (advisory becomes authoritative if they show up)
if _match("reviewer"):
return True
# After ghost default, the NON-SILENT party gets unilateral resolution authority.
# Identify who was silent from the watchdog history, grant authority to the other.
if obl.get("status") == "ghost_defaulted":
silent_party = None
for h in reversed(obl.get("history", [])):
if h.get("event") == "watchdog_default" or h.get("status") == "ghost_defaulted":
silent_party = h.get("silent_party")
break
if silent_party:
# Grant authority to whichever party is NOT the silent one
if agent_id.lower() != silent_party.lower():
if _match("claimant", "created_by") or _match("counterparty", "counterparty"):
return True
else:
# Fallback: both claimant and counterparty can resolve
if _match("claimant", "created_by") or _match("counterparty", "counterparty"):
return True
policy = obl.get("closure_policy", "counterparty_accepts")
# Phase 5B: claimant unilateral resolve when evidence_submitted + TTL exceeded.
# This MUST run before the policy-specific returns so it overrides counterparty_accepts.
# Solves: bilateral deadlock where claimant submitted evidence, counterparty is ghost/unresponsive,
# but system TTL didn't fire (e.g. counterparty_liveness_class = "active" despite being unreachable).
if (policy in ("counterparty_accepts", "claimant_self_attests") and
obl.get("status") == "evidence_submitted" and
obl.get("evidence_refs")):
last_evidence = obl.get("evidence_refs", [{}])[-1].get("submitted_at", "")
if last_evidence:
hours_since_evidence = _hours_since_iso(last_evidence) if last_evidence else 999
if hours_since_evidence >= 24 and _match("claimant", "created_by"):
return True
if policy == "claimant_self_attests":
return _match("claimant", "created_by")
elif policy == "counterparty_accepts":
return _match("counterparty", "counterparty")
elif policy == "claimant_plus_reviewer":
return _match("reviewer")
elif policy == "reviewer_required":
return _match("reviewer")
elif policy == "arbiter_rules":
return _match("arbiter")
elif policy == "protocol_resolves":
# Ghost Counterparty Protocol v1: either party can resolve once protocol is triggered.
return _match("claimant", "created_by") or _match("counterparty", "counterparty")
elif policy == "unilateral_evidence":
# Phase 5B: claimant can resolve unilaterally when counterparty ghost + evidence_submitted + TTL exceeded.
if _match("claimant", "created_by") and obl.get("status") == "evidence_submitted":
return True
return _match("counterparty", "counterparty")
return False
@obligations_bp.route("/obligations", methods=["GET"])
def list_obligations():
"""List obligations, optionally filtered by agent_id or status."""
obls = load_obligations()
if _expire_obligations(obls):
save_obligations(obls)
agent_id = request.args.get("agent_id")
status = request.args.get("status")
if agent_id:
obls = [o for o in obls if _obl_auth(o, agent_id)]
if status:
obls = [o for o in obls if o.get("status") == status]
return jsonify({"obligations": obls, "count": len(obls)})
def _detect_role_from_text(text: str) -> list[str]:
"""Detect role categories from obligation commitment text.
Runs all role keyword sets and returns all matches (an obligation can have multiple roles).
"""
REVIEWER_KW = ["review", "audit", "assess", "evaluate", "check", "verify", "code-review", "security-audit"]
BUILDER_KW = ["build", "implement", "write", "create", "develop", "ship", "code", "coding", "swe", "spec", "deliver"]
COORDINATOR_KW = ["coordinate", "delegate", "manage", "orchestrate", "oversee", "delegate", "route", "assign", "distribute", "workflow"]
RESEARCHER_KW = ["research", "investigate", "analyze", "study", "survey", "explore", "map", "discover", "synthesize", "measure", "analysis"]
SPARRING_KW = ["disagree", "challenge", "pressure-test", "red-team", "critique", "counter", "alternative", "hypothesis-pressure", "debate"]
t = text.lower()
roles = []
if any(kw in t for kw in REVIEWER_KW): roles.append("reviewer")
if any(kw in t for kw in BUILDER_KW): roles.append("builder")
if any(kw in t for kw in COORDINATOR_KW): roles.append("coordinator")
if any(kw in t for kw in RESEARCHER_KW): roles.append("researcher")
if any(kw in t for kw in SPARRING_KW): roles.append("sparring_partner")
return roles
@obligations_bp.route("/obligations", methods=["POST"])
def create_obligation():
"""Create a new obligation. Requires authenticated agent (from + secret)."""
data = request.get_json(silent=True) or {}
agent_id = data.get("from") or data.get("created_by")
secret = data.get("secret")
counterparty = data.get("counterparty")
commitment = data.get("commitment")
if not agent_id or not secret:
return jsonify({"error": "from and secret required"}), 400
# Verify agent
agents = load_agents()
if agent_id not in agents or agents[agent_id].get("secret") != secret:
return jsonify({"error": "invalid credentials"}), 401
if not counterparty or not commitment:
return jsonify({"error": "counterparty and commitment required"}), 400
# Ghost Counterparty Protocol v1: snapshot counterparty liveness at obligation creation
cp_info = agents.get(counterparty, {}) if isinstance(agents, dict) else {}
if not isinstance(cp_info, dict):
cp_info = {}
obl_id = f"obl-{uuid.uuid4().hex[:12]}"
now = datetime.utcnow().isoformat() + "Z"
closure_policy = data.get("closure_policy", "counterparty_accepts")
if closure_policy not in _CLOSURE_POLICIES:
return jsonify({"error": f"invalid closure_policy, must be one of: {_CLOSURE_POLICIES}"}), 400
deadline_utc = data.get("deadline_utc")
if closure_policy in _DEADLINE_REQUIRED_POLICIES and not deadline_utc:
return jsonify({"error": f"deadline_utc is required for closure_policy '{closure_policy}' (prevents indefinite hang)"}), 400
timeout_policy = data.get("timeout_policy", "claimant_self_resolve")
if timeout_policy not in _TIMEOUT_POLICIES:
return jsonify({"error": f"invalid timeout_policy, must be one of: {_TIMEOUT_POLICIES}"}), 400
# Validate all referenced agent IDs exist in registry (strict, case-sensitive)
referenced_ids = {counterparty}
custom_bindings = data.get("role_bindings")
if custom_bindings:
for rb in custom_bindings:
aid = rb.get("agent_id")
if aid:
referenced_ids.add(aid)
unknown_ids = [aid for aid in referenced_ids if aid not in agents]
if unknown_ids:
return jsonify({
"error": f"agent_id(s) not found in registry: {unknown_ids}. All parties and role_binding agent_ids must be registered Hub agents. Check exact case.",
"hint": "GET /agents to see registered agent IDs"
}), 400
# B: role_bindings required when binding_scope_text names agents
# Parse agent IDs mentioned in binding_scope_text and require them in role_bindings
scope_text = data.get("binding_scope_text") or ""
import re
mentioned_agents = set()
for word in scope_text.replace(".", " ").replace(",", " ").replace(":", " ").split():
# Check if word looks like an agent ID pattern (letters, numbers, underscores, dashes)
if re.match(r'^[a-zA-Z][a-zA-Z0-9_-]{2,30}$', word) and word in agents:
mentioned_agents.add(word)
binding_roles = {rb.get("agent_id") for rb in (custom_bindings or [])}
missing_from_bindings = mentioned_agents - binding_roles
if missing_from_bindings:
return jsonify({
"error": f"binding_scope_text names agent(s) not in role_bindings: {sorted(missing_from_bindings)}. "
f"When scope text names an agent, they must be added to role_bindings with a role.",
"hint": "Add all named agents to role_bindings: [{\"role\": \"resolver\", \"agent_id\": \"X\"}, ...]"
}), 400
obl = {
"obligation_id": obl_id,
"created_at": now,
"created_by": agent_id,
"counterparty": counterparty,
"parties": [
{"agent_id": agent_id},
{"agent_id": counterparty},
],
"role_bindings": data.get("role_bindings", [
{"role": "claimant", "agent_id": agent_id},
{"role": "counterparty", "agent_id": counterparty},
]),
"status": "proposed",
"commitment": commitment,
# discussed: proposed/draft text that led to the commitment (may differ from commitment).
# Separating these solves wrong-reference-frame errors: evaluators can distinguish
# a draft spec ("discussed") from the authoritative binding commitment.
"discussed": data.get("discussed"),
"success_condition": data.get("success_condition"),
"closure_policy": closure_policy,
"deadline_utc": deadline_utc,
"timeout_policy": timeout_policy,
"binding_scope_text": data.get("binding_scope_text"),
"vi_credential_ref": data.get("vi_credential_ref"),
"watchdog_config": data.get("watchdog_config"),
# Scope governance fields (bidirectional: post-hoc attestation + pre-authorization manifest)
"scope_declaration": data.get("scope_declaration"), # Declared capability envelope: {"read": [...], "write": [...], "exec": [...], "net": [...]}
"role_categories": data.get("role_categories") or _detect_role_from_text(commitment or ""), # Auto-detected + explicit override