-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathagents.py
More file actions
2213 lines (1925 loc) · 88.5 KB
/
agents.py
File metadata and controls
2213 lines (1925 loc) · 88.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
Agents Module — Agent profiles, permissions, portfolios, pubkey registry, DID docs.
Owns: agent CRUD (archive, update, profile), permission checks, portfolio,
checkpoints dashboard, pubkey registry, DID document registry,
artifacts, security-check, notify settings, email ingest,
hub-level A2A agent card.
"""
import json
import os
import secrets
import urllib.request
import urllib.error
import uuid
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
from flask import Blueprint, request, jsonify
class _NoRedirect(urllib.request.HTTPRedirectHandler):
"""Block HTTP redirects to prevent SSRF via open redirectors."""
def redirect_request(self, req, fp, code, msg, headers, newurl):
raise urllib.error.HTTPError(req.full_url, code, msg, headers, fp)
from hub.messaging import (
load_agents, save_agents, agents_lock,
_validate_callback_url, _compute_agent_liveness,
_agent_callback_delivery_ready, _agent_delivery_capability,
)
agents_bp = Blueprint("agents_ext", __name__)
# Module state — set by init_agents()
_DATA_DIR = None
_PUBKEYS_FILE = None
_DID_DOCS_FILE = None
_AGENT_SIGNING_KEYS_FILE = None
_ARTIFACTS_FILE = None
_EMAIL_DIR = None
def init_agents(data_dir):
global _DATA_DIR, _PUBKEYS_FILE, _DID_DOCS_FILE, _AGENT_SIGNING_KEYS_FILE
global _ARTIFACTS_FILE, _EMAIL_DIR
_DATA_DIR = Path(str(data_dir))
_PUBKEYS_FILE = _DATA_DIR / "pubkeys.json"
_DID_DOCS_FILE = _DATA_DIR / "did_docs.json"
_AGENT_SIGNING_KEYS_FILE = _DATA_DIR / "agent_signing_keys.json"
_ARTIFACTS_FILE = os.path.join(str(data_dir), "artifacts.json")
_EMAIL_DIR = _DATA_DIR / "emails"
_EMAIL_DIR.mkdir(parents=True, exist_ok=True)
# ── Notify settings helpers ────────────────────────────────────────────────
def _load_notify_settings():
path = _DATA_DIR / "notify_settings.json"
if path.exists():
with open(path) as f:
return json.load(f)
return {}
def _save_notify_settings(settings):
with open(_DATA_DIR / "notify_settings.json", "w") as f:
json.dump(settings, f, indent=2)
# ── Pubkey / DID / Signing-key storage ─────────────────────────────────────
def _load_pubkeys():
"""Load per-agent pubkey registry. Returns dict: agent_id -> list of key objects."""
if _PUBKEYS_FILE.exists():
try:
with open(_PUBKEYS_FILE) as f:
return json.load(f)
except Exception:
return {}
return {}
def _save_pubkeys(pubkeys):
"""Save per-agent pubkey registry."""
with open(_PUBKEYS_FILE, "w") as f:
json.dump(pubkeys, f, indent=2)
def _load_did_docs():
"""Load DID document registry. Returns dict: agent_id -> did_doc object."""
if _DID_DOCS_FILE.exists():
try:
with open(_DID_DOCS_FILE) as f:
return json.load(f)
except Exception:
return {}
return {}
def _save_did_docs(docs):
"""Save DID document registry."""
with open(_DID_DOCS_FILE, "w") as f:
json.dump(docs, f, indent=2)
def _lookup_agent_by_pubkey(pubkey_b64_or_hex):
"""Resolve a public key to an agent_id. Returns (agent_id, key_record) or (None, None).
Accepts base64 or hex-encoded pubkeys and matches against registered keys."""
import base64
pubkeys = _load_pubkeys()
# Normalize input to base64 for comparison
try:
# Try hex first
raw = bytes.fromhex(pubkey_b64_or_hex)
search_b64 = base64.b64encode(raw).decode()
except ValueError:
# Assume base64
search_b64 = pubkey_b64_or_hex
for agent_id, keys in pubkeys.items():
for key in keys:
if key.get("active", True) and key.get("public_key") == search_b64:
return agent_id, key
return None, None
def _load_agent_signing_keys():
if _AGENT_SIGNING_KEYS_FILE.exists():
try:
with open(_AGENT_SIGNING_KEYS_FILE) as f:
return json.load(f)
except Exception:
return {}
return {}
def _save_agent_signing_keys(data):
with open(_AGENT_SIGNING_KEYS_FILE, "w") as f:
json.dump(data, f, indent=2)
def _canonical_agent_attestation_payload(export_data, attestation_meta):
import copy
payload = {
"obligation_id": export_data.get("obligation_id"),
"evidence_refs": copy.deepcopy(export_data.get("evidence_refs", [])),
"history": copy.deepcopy(export_data.get("history", [])),
"agent_attestation": {
"agent_id": attestation_meta.get("agent_id"),
"key_id": attestation_meta.get("key_id"),
"algorithm": attestation_meta.get("algorithm", "Ed25519"),
"signed_at": attestation_meta.get("signed_at"),
"claim_type": attestation_meta.get("claim_type", "export_snapshot")
}
}
return json.dumps(payload, sort_keys=True, separators=(",", ":"), ensure_ascii=False).encode("utf-8")
def _maybe_build_agent_attestation(export_data, agent_id):
"""Best-effort per-agent attestation for one export fixture lane.
Uses a custodial test signing key generated via authenticated endpoint."""
import base64
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
signing = _load_agent_signing_keys().get(agent_id)
if not signing:
return None
key_id = signing.get("key_id")
private_b64 = signing.get("private_key")
if not key_id or not private_b64:
return None
signed_at = datetime.utcnow().isoformat() + "Z"
meta = {
"agent_id": agent_id,
"key_id": key_id,
"algorithm": "Ed25519",
"signed_at": signed_at,
"claim_type": "export_snapshot"
}
private_key = Ed25519PrivateKey.from_private_bytes(base64.b64decode(private_b64))
canonical = _canonical_agent_attestation_payload(export_data, meta)
signature = private_key.sign(canonical)
meta["signature"] = base64.b64encode(signature).decode()
meta["signed_fields"] = "obligation_id,evidence_refs,history,agent_attestation"
meta["verification"] = "Resolve public key via GET /agents/{agent_id}/pubkeys, reconstruct canonical payload, verify detached Ed25519 signature."
return meta
# ── Artifact storage + verification helpers ────────────────────────────────
def load_artifacts():
if os.path.exists(_ARTIFACTS_FILE):
with open(_ARTIFACTS_FILE) as f:
return json.load(f)
return {}
def save_artifacts(data):
with open(_ARTIFACTS_FILE, "w") as f:
json.dump(data, f, indent=2)
def _verify_url_liveness(url):
"""Check if a URL returns 200. Returns (alive: bool, status_code: int|None, error: str|None)."""
if not url or not url.startswith(("http://", "https://")):
return False, None, "invalid_url"
url_safe, url_err = _validate_callback_url(url)
if not url_safe:
return False, None, f"ssrf_blocked: {url_err}"
opener = urllib.request.build_opener(_NoRedirect)
try:
req = urllib.request.Request(url, method="HEAD")
req.add_header("User-Agent", "AgentHub/0.5 artifact-verify")
resp = opener.open(req, timeout=10)
return resp.status == 200, resp.status, None
except urllib.error.HTTPError as e:
if e.code in (301, 302, 303, 307, 308):
return False, e.code, "redirect_blocked"
# HEAD might be rejected, try GET
try:
req2 = urllib.request.Request(url, method="GET")
req2.add_header("User-Agent", "AgentHub/0.5 artifact-verify")
resp2 = opener.open(req2, timeout=10)
return resp2.status == 200, resp2.status, None
except Exception as e2:
return False, getattr(e, 'code', None), str(e2)[:200]
except Exception as e:
return False, None, str(e)[:200]
def _verify_thread_corroboration(source_thread, url, title):
"""Check if the source_thread conversation contains references to the artifact.
Returns (corroborated: bool, evidence_count: int, checked_messages: int).
Messages are stored per-inbox: {agent_id}.json contains all messages TO that agent.
To find brain<->testy conversation: check testy.json for from=brain, and brain.json for from=testy.
"""
if not source_thread:
return False, 0, 0
# Parse source_thread format: "agent_a↔agent_b" or "agent_a<>agent_b"
pair = None
for sep in ["↔", "<>", "⟷"]:
if sep in source_thread:
parts = source_thread.split(sep, 1)
if len(parts) == 2:
pair = (parts[0].strip(), parts[1].strip())
break
if not pair:
return False, 0, 0
# Collect conversation messages from both inboxes
evidence = 0
checked = 0
seen_ids = set()
for agent_id in pair:
other = pair[1] if agent_id == pair[0] else pair[0]
msg_path = os.path.join(_DATA_DIR, "messages", f"{agent_id}.json")
if not os.path.exists(msg_path):
continue
try:
with open(msg_path) as f:
msgs = json.load(f)
except Exception:
continue
for msg in msgs:
# agent_id.json = messages TO agent_id. Filter by from=other to get the pair.
msg_from = msg.get("from", "")
if msg_from != other:
continue
msg_id = msg.get("id", "")
if msg_id in seen_ids:
continue
seen_ids.add(msg_id)
checked += 1
content = msg.get("message", "").lower()
# Check for URL reference (exact or partial domain match)
if url and url.lower() in content:
evidence += 1
continue
# Check for title reference
if title and len(title) > 5 and title.lower() in content:
evidence += 1
continue
# Check for filename from URL
if url:
url_parts = url.rstrip("/").split("/")
filename = url_parts[-1] if url_parts else ""
if filename and len(filename) > 3 and filename.lower() in content:
evidence += 1
return evidence > 0, evidence, checked
# ── Base58 helper ──────────────────────────────────────────────────────────
def _base58_encode(data: bytes) -> str:
"""Encode bytes as base58 (Bitcoin alphabet)."""
import base58
return base58.b58encode(data).decode()
# ══════════════════════════════════════════════════════════════════════════════
# Routes
# ══════════════════════════════════════════════════════════════════════════════
@agents_bp.route("/agents/<agent_id>/archive", methods=["POST"])
def archive_agent(agent_id):
"""Archive or unarchive an agent. Admin-only. Archived agents are hidden from listings
but their data (messages, attestations, obligations) is fully preserved.
Body: {"secret": "<admin_secret>", "action": "archive"|"unarchive", "reason": "optional reason"}
"""
data = request.get_json() or {}
secret = data.get("secret", "")
admin_secret = os.environ.get("HUB_ADMIN_SECRET", "")
if not admin_secret or secret != admin_secret:
return jsonify({"ok": False, "error": "Admin authentication required"}), 403
agents = load_agents()
if agent_id not in agents:
return jsonify({"ok": False, "error": f"Agent '{agent_id}' not found"}), 404
action = data.get("action", "archive")
reason = data.get("reason", "")
if action == "archive":
agents[agent_id]["status"] = "archived"
agents[agent_id]["archived_at"] = datetime.utcnow().isoformat() + "Z"
if reason:
agents[agent_id]["archive_reason"] = reason
save_agents(agents)
print(f"[ADMIN] Archived agent: {agent_id} (reason: {reason or 'none'})")
return jsonify({
"ok": True,
"agent_id": agent_id,
"status": "archived",
"reason": reason,
"note": "Agent hidden from listings. Data preserved. Use action=unarchive to restore."
})
elif action == "unarchive":
agents[agent_id].pop("status", None)
agents[agent_id].pop("archived_at", None)
agents[agent_id].pop("archive_reason", None)
save_agents(agents)
print(f"[ADMIN] Unarchived agent: {agent_id}")
return jsonify({
"ok": True,
"agent_id": agent_id,
"status": "active",
"note": "Agent restored to listings."
})
else:
return jsonify({"ok": False, "error": "action must be 'archive' or 'unarchive'"}), 400
@agents_bp.route("/agents/<agent_id>/profile", methods=["GET"])
def get_agent_profile(agent_id):
"""Return standardized agent profile for ecosystem discovery and trust portability.
Schema: agent_id, display_name, capabilities[], trust_score, trust_stability,
work_routing_rank, active_since, last_active, hub_version,
identity_namespace, public_key, public_artifacts[].
Agents hosting their own profile: GET https://admin.slate.ceo/oc/{agent_id}/artifacts/{agent_id}-profile-v2.json
Hub aggregation endpoint: GET /agents/{agent_id}/profile (this endpoint)
"""
import urllib.request
from hub.trust import _behavioral_404
from hub.obligations import load_obligations, _expire_obligations
agents = load_agents()
if agent_id not in agents:
return jsonify(_behavioral_404("agent")), 404
info = agents[agent_id]
# Fetch trust data from /trust/<agent_id>
trust_score = None
trust_stability = None
try:
req = urllib.request.Request(f"http://127.0.0.1:8080/trust/{agent_id}",
headers={"User-Agent": "Hub/1.0"})
with urllib.request.urlopen(req, timeout=5) as resp:
trust_data = json.loads(resp.read())
bt = trust_data.get("behavioral_trust", {})
ce = bt.get("commitment_evidence", {})
total = ce.get("total_obligations", 0)
resolved = ce.get("resolved", 0)
if total > 0:
trust_score = round(resolved / total, 3)
else:
trust_score = None
# trust_stability derived from activity volume and recency
rr = ce.get("resolution_rate", 0)
liveness_class = _compute_agent_liveness(agent_id, agents).get("liveness_class", "dead")
if liveness_class == "active" and total >= 5 and rr >= 0.8:
trust_stability = "STABLE_HIGH"
elif liveness_class in ("active", "warm") and total >= 2 and rr >= 0.5:
trust_stability = "STABLE_MEDIUM"
elif total > 0:
trust_stability = "UNSTABLE"
else:
trust_stability = "NEW"
except Exception:
trust_score = None
trust_stability = None
# Compute work_routing_rank from obligations
# Rank = resolved obligations as counterparty + proposer
obligations = load_obligations()
if isinstance(obligations, dict):
obligations_list = list(obligations.values())
else:
obligations_list = obligations
_expire_obligations(obligations)
obls_as_cp = 0
obls_as_prop = 0
for obl in obligations_list:
if obl.get("status") == "resolved":
if obl.get("counterparty") == agent_id:
obls_as_cp += 1
if obl.get("proposer") == agent_id:
obls_as_prop += 1
work_routing_rank = obls_as_cp + obls_as_prop
# Capabilities: use Hub DB capabilities as base, include detail if available
caps = info.get("capabilities", [])
cap_list = []
for c in caps:
if isinstance(c, dict):
cap_list.append(c)
else:
cap_list.append({"name": c, "category": "general", "description": "", "pricing": "unknown"})
# Augment with capability descriptions if available
cap_descriptions = {
"coding": ("coding", "development", "Code implementation and review", "negotiable"),
"infrastructure": ("infrastructure", "devops", "System setup and operations", "negotiable"),
"research": ("research", "analysis", "Information synthesis and analysis", "negotiable"),
"obligation-design": ("obligation-design", "protocol", "Commitment and escrow design", "open_source"),
"web-hosting": ("web-hosting", "infrastructure", "Public endpoint hosting", "free"),
}
for cap in cap_list:
if cap.get("name") in cap_descriptions and not cap.get("description"):
_, cat, desc, pricing = cap_descriptions[cap["name"]]
cap.setdefault("category", cat)
cap["description"] = desc
cap["pricing"] = pricing
# Active_since from registration
active_since = info.get("registered_at")
# Last_active from liveness
liveness = _compute_agent_liveness(agent_id, agents)
last_active = liveness.get("last_message_received") or liveness.get("last_inbox_check")
# Hub version - try to infer from agent description or capabilities
hub_version = info.get("hub_version") or info.get("updated_at") or None
# Identity namespace
identity_namespace = "hub.openclaw.ai"
# Public key (first active key from pubkeys)
public_key = None
try:
req = urllib.request.Request(f"http://127.0.0.1:8080/agents/{agent_id}/pubkeys",
headers={"User-Agent": "Hub/1.0"})
with urllib.request.urlopen(req, timeout=5) as resp:
pubkeys_data = json.loads(resp.read())
active_keys = [k for k in pubkeys_data.get("keys", []) if k.get("status") == "active"]
if active_keys:
public_key = active_keys[0].get("public_key")
except Exception:
public_key = None
# Try to fetch agent's self-hosted profile for public_artifacts
# Agents hosting their own profile at: https://admin.slate.ceo/oc/{agent_id}/artifacts/{agent_id}-profile-v2.json
# Note: agent_id in URL paths is lowercase (e.g., staragent, not StarAgent)
public_artifacts = []
try:
agent_base = f"https://admin.slate.ceo/oc/{agent_id}"
profile_urls_to_try = [
f"{agent_base}/artifacts/{agent_id.lower()}-profile-v2.json",
f"{agent_base}/artifacts/{agent_id.lower()}-profile.json",
]
for profile_url in profile_urls_to_try:
try:
req = urllib.request.Request(profile_url, headers={"User-Agent": "Hub/1.0"})
with urllib.request.urlopen(req, timeout=5) as resp:
ext_profile = json.loads(resp.read())
# Pull public_artifacts from their self-hosted profile
ext_artifacts = ext_profile.get("public_artifacts", [])
if ext_artifacts:
public_artifacts = ext_artifacts
break
elif ext_profile.get("deliverables"):
# Normalize deliverables to public_artifacts shape
for d in (ext_profile.get("deliverables") or []):
public_artifacts.append({
"name": d.get("name"),
"url": d.get("url"),
"obl_id": d.get("obl_id")
})
if public_artifacts:
break
except Exception:
continue
except Exception:
public_artifacts = []
profile = {
"agent_id": agent_id,
"display_name": agent_id,
"capabilities": cap_list,
"trust_score": trust_score,
"trust_stability": trust_stability,
"work_routing_rank": work_routing_rank,
"active_since": active_since,
"last_active": last_active,
"hub_version": hub_version,
"identity_namespace": identity_namespace,
"public_key": public_key,
"public_artifacts": public_artifacts,
}
return jsonify(profile)
@agents_bp.route("/agents/<agent_id>/behavioral-history", methods=["GET"])
def get_agent_behavioral_history(agent_id):
"""Return behavioral history projections for an agent.
Projection modes:
- trust_trajectory: time series of trust score changes based on obligation resolution
- delivery_profile: obligation completion stats segmented by counterparty and status
- both (default): full response
Used by the W3C DID BehavioralHistoryService endpoint registration example.
Hub deployment: GET https://hub.slate.ceo/agents/{agent_id}/behavioral-history
"""
from hub.trust import _behavioral_404
from hub.obligations import load_obligations, _expire_obligations
projection = request.args.get("projection", "both")
agents = load_agents()
if agent_id not in agents:
return jsonify(_behavioral_404("agent")), 404
obligations = load_obligations()
_expire_obligations(obligations)
# Filter obligations where this agent is a party
agent_obligations = [
obl for obl in obligations
if obl.get("proposer") == agent_id or obl.get("counterparty") == agent_id
]
result = {}
if projection in ("trust_trajectory", "both"):
# Compute trust trajectory: obligations resolved over time
resolved = [o for o in agent_obligations if o.get("status") == "resolved"]
failed = [o for o in agent_obligations if o.get("status") == "failed"]
proposed = [o for o in agent_obligations if o.get("status") == "proposed"]
active = [o for o in agent_obligations if o.get("status") in ("accepted", "evidence_submitted")]
total = len(agent_obligations)
resolution_rate = round(len(resolved) / total, 3) if total > 0 else None
# Time-bucketed trajectory (monthly)
monthly = defaultdict(lambda: {"resolved": 0, "failed": 0, "total": 0})
for o in agent_obligations:
ts = o.get("created_at", "")[:7] # YYYY-MM
monthly[ts]["total"] += 1
if o.get("status") == "resolved":
monthly[ts]["resolved"] += 1
elif o.get("status") == "failed":
monthly[ts]["failed"] += 1
trajectory = []
cumulative_resolved = 0
for month in sorted(monthly.keys()):
bucket = monthly[month]
cumulative_resolved += bucket["resolved"]
trajectory.append({
"period": month,
"resolved": bucket["resolved"],
"failed": bucket["failed"],
"total": bucket["total"],
"cumulative_resolved": cumulative_resolved
})
# Counterparties worked with
counterparties = list(set(
o.get("counterparty") for o in agent_obligations
if o.get("counterparty") and o.get("counterparty") != agent_id
))
result["trust_trajectory"] = {
"agent_id": agent_id,
"total_obligations": total,
"resolved": len(resolved),
"failed": len(failed),
"active": len(active),
"resolution_rate": resolution_rate,
"counterparties": counterparties,
"trajectory": trajectory
}
if projection in ("delivery_profile", "both"):
# Delivery profile: stats segmented by counterparty
by_counterparty = defaultdict(lambda: {"total": 0, "resolved": 0, "failed": 0, "active": 0})
for o in agent_obligations:
cp = o.get("counterparty", "unknown")
by_counterparty[cp]["total"] += 1
s = o.get("status")
if s == "resolved":
by_counterparty[cp]["resolved"] += 1
elif s == "failed":
by_counterparty[cp]["failed"] += 1
elif s in ("accepted", "evidence_submitted"):
by_counterparty[cp]["active"] += 1
delivery_breakdown = []
for cp, stats in sorted(by_counterparty.items(), key=lambda x: -x[1]["total"]):
rate = round(stats["resolved"] / stats["total"], 3) if stats["total"] > 0 else None
delivery_breakdown.append({
"counterparty": cp,
"total": stats["total"],
"resolved": stats["resolved"],
"failed": stats["failed"],
"active": stats["active"],
"resolution_rate": rate
})
# Status distribution
status_dist = defaultdict(int)
for o in agent_obligations:
status_dist[o.get("status", "unknown")] += 1
result["delivery_profile"] = {
"agent_id": agent_id,
"status_distribution": dict(status_dist),
"by_counterparty": delivery_breakdown,
"total_obligations": len(agent_obligations)
}
return jsonify(result)
@agents_bp.route("/agents/<agent_id>/permissions", methods=["GET"])
def get_agent_permissions(agent_id):
"""Return current permission constraints + trust-adjusted effective limits.
Phase 1 visibility endpoint from Lloyd's permission-scoping spec.
Public read by default: operators/counterparties need to inspect constraint posture.
"""
from hub.trust import _behavioral_404, _compute_agent_permission_state
from hub.obligations import load_obligations, _expire_obligations, save_obligations
agents = load_agents()
if agent_id not in agents:
return jsonify(_behavioral_404("agent")), 404
obligations = load_obligations()
if _expire_obligations(obligations):
save_obligations(obligations)
state = _compute_agent_permission_state(agent_id, agents=agents, obligations=obligations)
return jsonify(state)
@agents_bp.route("/agents/<agent_id>/permissions/check", methods=["POST"])
def check_agent_permission(agent_id):
"""Programmatic permission enforcement check.
Phase 2 enforcement endpoint. Returns (allowed: bool, reason: str).
Supported actions: send_message, create_obligation, trust_attest, scope_expansion.
Pass {"action": "action_name", ...kwargs} as JSON body.
If allowed=False, denial is logged to agent's denial_history.
"""
from hub.trust import check_permission
data = request.get_json() or {}
action = data.get("action")
if not action:
return jsonify({"ok": False, "error": "action required"}), 400
allowed, reason = check_permission(agent_id, **data)
return jsonify({
"ok": allowed,
"agent_id": agent_id,
"action": action,
"reason": reason,
"timestamp": datetime.utcnow().isoformat() + "Z",
})
@agents_bp.route("/agents/<agent_id>/portfolio", methods=["GET"])
def agent_portfolio(agent_id):
"""Public obligation portfolio for an agent.
Returns a structured summary of an agent's obligation track record:
completed obligations, success rate, total USDC earned/spent,
average completion time, and counterparty list.
No authentication required — this is a public proof-of-work page.
"""
from hub.trust import _behavioral_404
from hub.obligations import load_obligations, _expire_obligations, save_obligations, _obl_auth
agents = load_agents()
if agent_id not in agents:
return jsonify(_behavioral_404("agent")), 404
obls = load_obligations()
if _expire_obligations(obls):
save_obligations(obls)
# Filter to obligations involving this agent
agent_obls = [o for o in obls if _obl_auth(o, agent_id)]
# Categorize
completed = [o for o in agent_obls if o.get("status") == "resolved"]
failed = [o for o in agent_obls if o.get("status") in ("failed", "expired", "deadline_elapsed")]
active = [o for o in agent_obls if o.get("status") in ("proposed", "accepted", "evidence_submitted")]
disputed = [o for o in agent_obls if o.get("status") == "disputed"]
# Calculate stats
total = len(agent_obls)
completed_count = len(completed)
success_rate = round(completed_count / max(total, 1) * 100, 1)
# Counterparties
counterparties = set()
for o in agent_obls:
cp = o.get("counterparty", "")
cb = o.get("created_by", "")
if cp and cp != agent_id:
counterparties.add(cp)
if cb and cb != agent_id:
counterparties.add(cb)
# Avg completion time for resolved obligations
avg_completion_hours = None
completion_times = []
for o in completed:
hist = o.get("history", [])
accepted_at = next((h["at"] for h in hist if h.get("status") == "accepted"), None)
resolved_at = next((h["at"] for h in hist if h.get("status") == "resolved"), None)
if accepted_at and resolved_at:
try:
from datetime import datetime as _dt
t_accept = _dt.fromisoformat(accepted_at.replace("Z", "+00:00"))
t_resolve = _dt.fromisoformat(resolved_at.replace("Z", "+00:00"))
delta_h = (t_resolve - t_accept).total_seconds() / 3600
completion_times.append(round(delta_h, 2))
except Exception:
pass
if completion_times:
avg_completion_hours = round(sum(completion_times) / len(completion_times), 2)
# Settlement totals — check both obligation-level settlement object and history
total_settled = 0
settlement_details = []
for o in completed:
s = o.get("settlement", {})
amt_str = s.get("settlement_amount", "")
if amt_str:
try:
amt = float(amt_str)
total_settled += amt
settlement_details.append({
"obligation_id": o["obligation_id"],
"amount": amt,
"token": s.get("settlement_currency", "USDC"),
"type": s.get("settlement_type", "unknown"),
"tx_ref": s.get("settlement_ref", "")[:80]
})
except (ValueError, TypeError):
pass
# Build obligation summaries
def obl_summary(o):
role = "creator" if o.get("created_by") == agent_id else "counterparty"
partner = o.get("counterparty") if role == "creator" else o.get("created_by", "")
return {
"obligation_id": o["obligation_id"],
"role": role,
"partner": partner,
"status": o["status"],
"commitment": o.get("commitment", "")[:200],
"created_at": o.get("created_at"),
"deadline_utc": o.get("deadline_utc")
}
portfolio = {
"agent_id": agent_id,
"description": agents[agent_id].get("description", ""),
"registered_at": agents[agent_id].get("registered_at"),
"stats": {
"total_obligations": total,
"completed": completed_count,
"failed": len(failed),
"active": len(active),
"disputed": len(disputed),
"success_rate_pct": success_rate,
"avg_completion_hours": avg_completion_hours,
"unique_counterparties": len(counterparties),
"counterparty_list": sorted(counterparties),
"total_settled_hub": round(total_settled, 2)
},
"settlements": settlement_details,
"obligations": {
"completed": [obl_summary(o) for o in completed],
"active": [obl_summary(o) for o in active],
"failed": [obl_summary(o) for o in failed]
},
"generated_at": datetime.utcnow().isoformat() + "Z",
"verify_at": f"/agents/{agent_id}/portfolio"
}
return jsonify(portfolio)
@agents_bp.route("/agents/<agent_id>/checkpoints", methods=["GET"])
def agent_checkpoints(agent_id):
"""Checkpoint dashboard: all checkpoints across all obligations for an agent.
Returns checkpoints categorized by action needed:
- needs_response: proposed by counterparty, awaiting this agent's confirm/reject
- awaiting_response: proposed by this agent, awaiting counterparty's response
- confirmed: historically confirmed checkpoints
- rejected: historically rejected checkpoints
No auth required — checkpoint summaries are public coordination state.
Query params:
status — filter by checkpoint status (proposed, confirmed, rejected)
"""
from hub.trust import _behavioral_404
from hub.obligations import load_obligations, _expire_obligations, save_obligations, _obl_auth
agents = load_agents()
if agent_id not in agents:
return jsonify(_behavioral_404("agent")), 404
obls = load_obligations()
if _expire_obligations(obls):
save_obligations(obls)
# Filter to obligations involving this agent
agent_obls = [o for o in obls if _obl_auth(o, agent_id)]
status_filter = request.args.get("status")
needs_response = [] # proposed by someone else, this agent should respond
awaiting_response = [] # proposed by this agent, waiting on counterparty
confirmed = []
rejected = []
for obl in agent_obls:
for cp in obl.get("checkpoints", []):
if status_filter and cp.get("status") != status_filter:
continue
# Determine counterparty for context
obl_parties = [p.get("agent_id") for p in obl.get("parties", [])]
counterparties = [p for p in obl_parties if p and p != agent_id]
entry = {
"checkpoint_id": cp["checkpoint_id"],
"obligation_id": obl["obligation_id"],
"commitment": obl.get("commitment", "")[:200],
"obligation_status": obl["status"],
"proposed_by": cp["proposed_by"],
"proposed_at": cp["proposed_at"],
"status": cp["status"],
"summary": cp["summary"],
"scope_update": cp.get("scope_update"),
"questions": cp.get("questions", []),
"open_question": cp.get("open_question"),
"reentry_hook": cp.get("reentry_hook"),
"partial_delivery_expected": cp.get("partial_delivery_expected"),
"note": cp.get("note"),
"counterparties": counterparties,
}
if cp.get("responded_by"):
entry["responded_by"] = cp["responded_by"]
entry["responded_at"] = cp.get("responded_at")
entry["response_note"] = cp.get("response_note")
if cp["status"] == "proposed":
if cp["proposed_by"] == agent_id:
awaiting_response.append(entry)
else:
# Add action hint
entry["action_hint"] = (
f"POST /obligations/{obl['obligation_id']}/checkpoint "
f"with {{\"action\":\"confirm\",\"checkpoint_id\":\"{cp['checkpoint_id']}\"}} "
f"or {{\"action\":\"reject\",\"checkpoint_id\":\"{cp['checkpoint_id']}\",\"note\":\"reason\"}}"
)
needs_response.append(entry)
elif cp["status"] == "confirmed":
confirmed.append(entry)
elif cp["status"] == "rejected":
rejected.append(entry)
# Sort by proposed_at descending
for lst in [needs_response, awaiting_response, confirmed, rejected]:
lst.sort(key=lambda x: x.get("proposed_at", ""), reverse=True)
total_pending = len(needs_response) + len(awaiting_response)
return jsonify({
"agent_id": agent_id,
"summary": {
"needs_response": len(needs_response),
"awaiting_response": len(awaiting_response),
"confirmed": len(confirmed),
"rejected": len(rejected),
"total_active": total_pending,
"total_all": len(needs_response) + len(awaiting_response) + len(confirmed) + len(rejected),
},
"needs_response": needs_response,
"awaiting_response": awaiting_response,
"confirmed": confirmed,
"rejected": rejected,
"generated_at": datetime.utcnow().isoformat() + "Z",
"note": "Checkpoint dashboard for mid-execution alignment verification. "
"'needs_response' items require your confirm/reject. "
"'awaiting_response' items are waiting on your counterparty.",
})
@agents_bp.route("/agents/<agent_id>", methods=["POST"])
def update_agent(agent_id):
"""Update agent profile (callback_url, description, capabilities).
Body: {"secret": "your-secret", "callback_url": "https://...", "description": "...", "capabilities": [...]}
"""
data = request.get_json() or {}
secret = data.get("secret", "")
agents = load_agents()
if agent_id not in agents:
return jsonify({"ok": False, "error": "Agent not found"}), 404
if agents[agent_id].get("secret") != secret:
return jsonify({"ok": False, "error": "Invalid secret"}), 403
updated = []
callback_update = None
if "callback_url" in data:
new_callback = data["callback_url"]
# Validate callback URL against SSRF before making any request
callback_ok = False
callback_error = None
if new_callback:
url_safe, url_err = _validate_callback_url(new_callback)
if not url_safe:
return jsonify({"ok": False, "error": f"Invalid callback_url: {url_err}"}), 400
try:
_opener = urllib.request.build_opener(_NoRedirect)
test_payload = json.dumps({"type": "callback_test", "from": "hub", "message": "Callback verification test"}).encode()
req = urllib.request.Request(new_callback, data=test_payload, headers={"Content-Type": "application/json"}, method="POST")
resp = _opener.open(req, timeout=10)
callback_ok = resp.status < 400
except Exception as e:
callback_error = f"{type(e).__name__}: {str(e)[:100]}"
callback_update = {
"callback_url": new_callback,
"callback_verified": callback_ok,
"callback_error": callback_error,
"callback_last_status": 200 if callback_ok else "verification_failed",
"callback_last_ok_at": datetime.utcnow().isoformat() + "Z" if callback_ok else None,
"callback_last_error_at": None if callback_ok else (datetime.utcnow().isoformat() + "Z" if new_callback else None),
}
updated.append("callback_url")
if not callback_ok and new_callback:
updated.append(f"WARNING: callback test failed ({callback_error}). Messages may not be delivered.")
with agents_lock() as agents:
if agent_id not in agents:
return jsonify({"ok": False, "error": "Agent not found"}), 404
if agents[agent_id].get("secret") != secret:
return jsonify({"ok": False, "error": "Invalid secret"}), 403
if callback_update is not None:
agents[agent_id].update(callback_update)
if "description" in data:
agents[agent_id]["description"] = data["description"]
updated.append("description")
if "capabilities" in data:
agents[agent_id]["capabilities"] = data["capabilities"]
updated.append("capabilities")
if "allowed_actions" in data:
if not isinstance(data["allowed_actions"], list) or not all(isinstance(x, str) for x in data["allowed_actions"]):
return jsonify({"ok": False, "error": "allowed_actions must be a list of strings"}), 400
agents[agent_id].setdefault("permissions", {}).setdefault("operator_constraints", {})["allowed_actions"] = data["allowed_actions"]
updated.append("allowed_actions")
if "permissions" in data:
perms = data["permissions"]
if not isinstance(perms, dict):