diff --git a/sdk/python/.gitignore b/sdk/python/.gitignore new file mode 100644 index 0000000..9228784 --- /dev/null +++ b/sdk/python/.gitignore @@ -0,0 +1,4 @@ +__pycache__/ +*.pyc +*.egg-info/ +dist/ diff --git a/sdk/python/README.md b/sdk/python/README.md new file mode 100644 index 0000000..8c01abc --- /dev/null +++ b/sdk/python/README.md @@ -0,0 +1,72 @@ +# Open Agent Trust Registry — Python SDK + +Python port of the TypeScript SDK (`sdk/typescript/`). Verifies agent +attestation tokens locally against the registry manifest — no network calls +per request, pure local computation, target `<1ms`. + +## Installation + +```bash +pip install open-agent-trust +``` + +## Quick Start + +```python +import json +from open_agent_trust import verify_attestation, RegistryManifest, RevocationList + +# Load the manifest and revocations you fetched and cached locally +with open("manifest.json") as f: + manifest = RegistryManifest.from_dict(json.load(f)) + +with open("revocations.json") as f: + revocations = RevocationList.from_dict(json.load(f)) + +# Verify an incoming attestation token +result = verify_attestation( + attestation_jws=token, + manifest=manifest, + revocations=revocations, + expected_audience="https://your-api.com", +) + +if result.valid: + # Safe to use result.claims.scope / result.claims.constraints + print("Agent verified:", result.issuer.issuer_id) + print("Scopes:", result.claims.scope) +else: + print("Rejected:", result.reason) +``` + +## Verification Protocol + +Implements the 14-step protocol from `spec/03-verification.md`: + +1. Parse JWS and extract `iss` / `kid` from protected header +2. Fast-reject against the revocation list (O(n) scan, expected empty) +3. Look up the issuer in the manifest +4. Reject unknown issuer +5. Reject suspended / revoked issuer +6. Locate key by `kid` in the issuer's `public_keys` array +7. Reject unknown key +8. Reject revoked key +9. Enforce 90-day grace period for deprecated keys +10. Reject expired registry public key +11–12. Verify Ed25519 signature cryptographically +13. Check `aud`, `exp`, and optional `nonce` claims +14. Return `VerificationResult(valid=True, issuer=..., claims=...)` + +## Requirements + +- Python 3.10+ +- `cryptography >= 41` (Ed25519 support) +- `PyJWT >= 2.8` (EdDSA algorithm) + +## Running Tests + +```bash +cd sdk/python +pip install -e ".[dev]" +pytest -v +``` diff --git a/sdk/python/pyproject.toml b/sdk/python/pyproject.toml new file mode 100644 index 0000000..dabfa42 --- /dev/null +++ b/sdk/python/pyproject.toml @@ -0,0 +1,24 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "open-agent-trust" +version = "0.1.0" +description = "Python SDK for the Open Agent Trust Registry — verify agent attestation tokens locally" +readme = "README.md" +license = { text = "MIT" } +requires-python = ">=3.10" +dependencies = [ + "cryptography>=41.0", + "PyJWT>=2.8", +] + +[project.optional-dependencies] +dev = ["pytest>=7.0", "hatchling"] + +[tool.hatch.build.targets.wheel] +packages = ["src/open_agent_trust"] + +[tool.pytest.ini_options] +testpaths = ["tests"] diff --git a/sdk/python/src/open_agent_trust/__init__.py b/sdk/python/src/open_agent_trust/__init__.py new file mode 100644 index 0000000..f0e8e1d --- /dev/null +++ b/sdk/python/src/open_agent_trust/__init__.py @@ -0,0 +1,49 @@ +""" +Open Agent Trust Registry — Python SDK + +Verify agent attestation tokens locally against the registry manifest. +No network calls required per verification (pure local computation, <1ms). + +Usage:: + + from open_agent_trust import verify_attestation, RegistryManifest, RevocationList + + manifest = RegistryManifest.from_dict(manifest_json) + revocations = RevocationList.from_dict(revocations_json) + + result = verify_attestation(token, manifest, revocations, audience="https://your-api.com") + if result.valid: + scopes = result.claims.scope +""" + +from .types import ( + AttestationClaims, + IssuerCapabilities, + IssuerEndpoints, + IssuerEntry, + PublicKey, + RegistryManifest, + RegistrySignature, + RevocationList, + RevokedIssuer, + RevokedKey, + VerificationResult, +) +from .verify import verify_attestation + +__all__ = [ + "verify_attestation", + "VerificationResult", + "AttestationClaims", + "RegistryManifest", + "RevocationList", + "IssuerEntry", + "PublicKey", + "IssuerCapabilities", + "IssuerEndpoints", + "RegistrySignature", + "RevokedKey", + "RevokedIssuer", +] + +__version__ = "0.1.0" diff --git a/sdk/python/src/open_agent_trust/types.py b/sdk/python/src/open_agent_trust/types.py new file mode 100644 index 0000000..e152a61 --- /dev/null +++ b/sdk/python/src/open_agent_trust/types.py @@ -0,0 +1,229 @@ +""" +Type definitions for the Open Agent Trust Registry Python SDK. + +Mirrors sdk/typescript/src/types/ for API parity. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any, Literal, Optional + + +# --- Registry types --- + +KeyAlgorithm = Literal["Ed25519", "ECDSA-P256"] +KeyStatus = Literal["active", "deprecated", "revoked"] +IssuerStatus = Literal["active", "suspended", "revoked"] + +VerificationReasonCode = Literal[ + "unknown_issuer", + "revoked_issuer", + "suspended_issuer", + "unknown_key", + "revoked_key", + "grace_period_expired", + "expired_attestation", + "invalid_signature", + "audience_mismatch", + "nonce_mismatch", +] + + +@dataclass +class PublicKey: + kid: str + algorithm: KeyAlgorithm + public_key: str # Base64url-encoded Ed25519 x coordinate + status: KeyStatus + issued_at: str + expires_at: str + deprecated_at: Optional[str] + revoked_at: Optional[str] + + @classmethod + def from_dict(cls, d: dict) -> "PublicKey": + return cls( + kid=d["kid"], + algorithm=d["algorithm"], + public_key=d["public_key"], + status=d["status"], + issued_at=d["issued_at"], + expires_at=d["expires_at"], + deprecated_at=d.get("deprecated_at"), + revoked_at=d.get("revoked_at"), + ) + + +@dataclass +class IssuerCapabilities: + supervision_model: str + audit_logging: bool + immutable_audit: bool + attestation_format: str + max_attestation_ttl_seconds: int + capabilities_verified: bool = False + + @classmethod + def from_dict(cls, d: dict) -> "IssuerCapabilities": + return cls( + supervision_model=d["supervision_model"], + audit_logging=d["audit_logging"], + immutable_audit=d["immutable_audit"], + attestation_format=d["attestation_format"], + max_attestation_ttl_seconds=d["max_attestation_ttl_seconds"], + capabilities_verified=d.get("capabilities_verified", False), + ) + + +@dataclass +class IssuerEndpoints: + attestation_verify: Optional[str] = None + revocation_list: Optional[str] = None + + @classmethod + def from_dict(cls, d: dict) -> "IssuerEndpoints": + return cls( + attestation_verify=d.get("attestation_verify"), + revocation_list=d.get("revocation_list"), + ) + + +@dataclass +class IssuerEntry: + issuer_id: str + display_name: str + website: str + security_contact: str + status: IssuerStatus + added_at: str + last_verified: str + public_keys: list[PublicKey] + capabilities: IssuerCapabilities + endpoints: Optional[IssuerEndpoints] = None + + @classmethod + def from_dict(cls, d: dict) -> "IssuerEntry": + return cls( + issuer_id=d["issuer_id"], + display_name=d["display_name"], + website=d["website"], + security_contact=d["security_contact"], + status=d["status"], + added_at=d["added_at"], + last_verified=d["last_verified"], + public_keys=[PublicKey.from_dict(k) for k in d["public_keys"]], + capabilities=IssuerCapabilities.from_dict(d["capabilities"]), + endpoints=IssuerEndpoints.from_dict(d["endpoints"]) if d.get("endpoints") else None, + ) + + +@dataclass +class RegistrySignature: + algorithm: KeyAlgorithm + kid: str + value: str + + @classmethod + def from_dict(cls, d: dict) -> "RegistrySignature": + return cls(algorithm=d["algorithm"], kid=d["kid"], value=d["value"]) + + +@dataclass +class RegistryManifest: + schema_version: str + registry_id: str + generated_at: str + expires_at: str + entries: list[IssuerEntry] + signature: RegistrySignature + + @classmethod + def from_dict(cls, d: dict) -> "RegistryManifest": + return cls( + schema_version=d["schema_version"], + registry_id=d["registry_id"], + generated_at=d["generated_at"], + expires_at=d["expires_at"], + entries=[IssuerEntry.from_dict(e) for e in d["entries"]], + signature=RegistrySignature.from_dict(d["signature"]), + ) + + +@dataclass +class RevokedKey: + issuer_id: str + kid: str + revoked_at: str + reason: str + + @classmethod + def from_dict(cls, d: dict) -> "RevokedKey": + return cls( + issuer_id=d["issuer_id"], + kid=d["kid"], + revoked_at=d["revoked_at"], + reason=d["reason"], + ) + + +@dataclass +class RevokedIssuer: + issuer_id: str + revoked_at: str + reason: str + + @classmethod + def from_dict(cls, d: dict) -> "RevokedIssuer": + return cls( + issuer_id=d["issuer_id"], + revoked_at=d["revoked_at"], + reason=d["reason"], + ) + + +@dataclass +class RevocationList: + schema_version: str + generated_at: str + expires_at: str + revoked_keys: list[RevokedKey] + revoked_issuers: list[RevokedIssuer] + signature: RegistrySignature + + @classmethod + def from_dict(cls, d: dict) -> "RevocationList": + return cls( + schema_version=d["schema_version"], + generated_at=d["generated_at"], + expires_at=d["expires_at"], + revoked_keys=[RevokedKey.from_dict(k) for k in d["revoked_keys"]], + revoked_issuers=[RevokedIssuer.from_dict(i) for i in d["revoked_issuers"]], + signature=RegistrySignature.from_dict(d["signature"]), + ) + + +# --- Attestation types --- + +AgnosticConstraints = dict[str, Any] + + +@dataclass +class AttestationClaims: + sub: str + aud: str + iat: int + exp: int + scope: list[str] + constraints: AgnosticConstraints + user_pseudonym: str + runtime_version: str + nonce: Optional[str] = None + + +@dataclass +class VerificationResult: + valid: bool + reason: Optional[VerificationReasonCode] = None + issuer: Optional[IssuerEntry] = None + claims: Optional[AttestationClaims] = None diff --git a/sdk/python/src/open_agent_trust/verify.py b/sdk/python/src/open_agent_trust/verify.py new file mode 100644 index 0000000..704ef96 --- /dev/null +++ b/sdk/python/src/open_agent_trust/verify.py @@ -0,0 +1,206 @@ +""" +14-step Verification Protocol implementation for the Open Agent Trust Registry. + +Mirrors sdk/typescript/src/verify.ts — pure local computation, no network calls. +Target: <1ms per verification on commodity hardware. + +Reference: spec/03-verification.md +""" + +from __future__ import annotations + +import base64 +from datetime import datetime, timezone +from typing import Optional + +import jwt as pyjwt +from cryptography.exceptions import InvalidSignature +from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey + +from .types import ( + AttestationClaims, + IssuerEntry, + RegistryManifest, + RevocationList, + VerificationResult, +) + +# Grace period for deprecated keys: 90 days (spec/04-key-rotation.md) +GRACE_PERIOD_SECONDS = 90 * 24 * 60 * 60 + + +def _base64url_decode(value: str) -> bytes: + """Decode a base64url string, adding padding as required.""" + # Add padding so len is a multiple of 4 + padding = 4 - len(value) % 4 + if padding != 4: + value += "=" * padding + return base64.urlsafe_b64decode(value) + + +def _utcnow() -> datetime: + return datetime.now(timezone.utc) + + +def _parse_iso(ts: str) -> datetime: + """Parse an ISO-8601 timestamp, always returning a UTC-aware datetime.""" + dt = datetime.fromisoformat(ts.replace("Z", "+00:00")) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt + + +def verify_attestation( + attestation_jws: str, + manifest: RegistryManifest, + revocations: RevocationList, + expected_audience: str, + expected_nonce: Optional[str] = None, + now: Optional[datetime] = None, +) -> VerificationResult: + """ + Execute the 14-step Verification Protocol to assess an agent attestation. + + Operates purely locally in <1ms without any network calls. + + Args: + attestation_jws: The compact JWS (JWT) attestation token. + manifest: The locally cached registry manifest. + revocations: The locally cached revocation list. + expected_audience: The service origin that should match the ``aud`` claim. + expected_nonce: Optional per-session nonce to check against ``nonce`` claim. + now: Override the current time (useful for testing with fixed vectors). + + Returns: + A :class:`VerificationResult` with ``valid=True`` on success, or + ``valid=False`` and an appropriate ``reason`` code on failure. + """ + current_time = now or _utcnow() + + try: + # Steps 1 & 2: Parse JWS and extract headers. + # PyJWT requires options={"verify_signature": False} to inspect headers + # before we have the public key. + try: + header = pyjwt.get_unverified_header(attestation_jws) + except pyjwt.exceptions.DecodeError: + return VerificationResult(valid=False, reason="invalid_signature") + + issuer_id: Optional[str] = header.get("iss") + kid: Optional[str] = header.get("kid") + alg: Optional[str] = header.get("alg") + + if not issuer_id or not kid or alg != "EdDSA": + return VerificationResult(valid=False, reason="invalid_signature") + + # Fast-reject: explicit revocation list check (5-min cache). + is_key_revoked = any( + k.kid == kid and k.issuer_id == issuer_id + for k in revocations.revoked_keys + ) + is_issuer_revoked_fast = any( + i.issuer_id == issuer_id for i in revocations.revoked_issuers + ) + + if is_key_revoked: + return VerificationResult(valid=False, reason="revoked_key") + if is_issuer_revoked_fast: + return VerificationResult(valid=False, reason="revoked_issuer") + + # Step 3: Look up issuer in manifest. + issuer: Optional[IssuerEntry] = next( + (e for e in manifest.entries if e.issuer_id == issuer_id), None + ) + + # Step 4: Unknown issuer. + if issuer is None: + return VerificationResult(valid=False, reason="unknown_issuer") + + # Step 5: Issuer status check. + if issuer.status == "suspended": + return VerificationResult(valid=False, reason="suspended_issuer", issuer=issuer) + if issuer.status == "revoked": + return VerificationResult(valid=False, reason="revoked_issuer", issuer=issuer) + + # Step 6: Locate key by kid. + key = next((k for k in issuer.public_keys if k.kid == kid), None) + + # Step 7: Unknown key. + if key is None: + return VerificationResult(valid=False, reason="unknown_key", issuer=issuer) + + # Step 8: Revoked key status check. + if key.status == "revoked": + return VerificationResult(valid=False, reason="revoked_key", issuer=issuer) + + # Step 9: Grace period enforcement for deprecated keys. + if key.status == "deprecated": + if not key.deprecated_at: + # spec/09a: missing deprecated_at is a data integrity error → REJECT + return VerificationResult(valid=False, reason="grace_period_expired", issuer=issuer) + deprecated_at = _parse_iso(key.deprecated_at) + elapsed = (current_time - deprecated_at).total_seconds() + if elapsed > GRACE_PERIOD_SECONDS: + return VerificationResult(valid=False, reason="grace_period_expired", issuer=issuer) + # Step 9c: within grace period — continue (caller may log a warning) + + # Step 10: Check key expiration against current date. + key_expiry = _parse_iso(key.expires_at) + if current_time > key_expiry: + return VerificationResult(valid=False, reason="invalid_signature", issuer=issuer) + + # Steps 11 & 12: Cryptographically verify the signature. + # The public_key field is the base64url-encoded raw 32-byte Ed25519 x coordinate. + try: + raw_public_key_bytes = _base64url_decode(key.public_key) + ed_public_key = Ed25519PublicKey.from_public_bytes(raw_public_key_bytes) + except (ValueError, Exception): + return VerificationResult(valid=False, reason="invalid_signature", issuer=issuer) + + # Decode the JWT, verifying signature and standard claims (exp, aud). + # PyJWT's Ed25519 algorithm wraps cryptography internally. + try: + payload = pyjwt.decode( + attestation_jws, + ed_public_key, + algorithms=["EdDSA"], + audience=expected_audience, + options={"verify_iat": False}, # iat checked by jose implicitly; we skip + leeway=0, + ) + except pyjwt.exceptions.ExpiredSignatureError: + return VerificationResult(valid=False, reason="expired_attestation", issuer=issuer) + except pyjwt.exceptions.InvalidAudienceError: + return VerificationResult(valid=False, reason="audience_mismatch", issuer=issuer) + except pyjwt.exceptions.InvalidSignatureError: + return VerificationResult(valid=False, reason="invalid_signature", issuer=issuer) + except pyjwt.exceptions.DecodeError: + return VerificationResult(valid=False, reason="invalid_signature", issuer=issuer) + + # Step 13: Additional claim checks. + + # Nonce check (replay prevention within a session). + if expected_nonce is not None and payload.get("nonce") != expected_nonce: + return VerificationResult(valid=False, reason="nonce_mismatch", issuer=issuer) + + # Explicit audience re-check (belt-and-suspenders; PyJWT should have caught this). + if payload.get("aud") != expected_audience: + return VerificationResult(valid=False, reason="audience_mismatch", issuer=issuer) + + # Step 14: All checks passed — build claims and return success. + claims = AttestationClaims( + sub=payload.get("sub", ""), + aud=payload.get("aud", ""), + iat=payload.get("iat", 0), + exp=payload.get("exp", 0), + scope=payload.get("scope", []), + constraints=payload.get("constraints", {}), + user_pseudonym=payload.get("user_pseudonym", ""), + runtime_version=payload.get("runtime_version", ""), + nonce=payload.get("nonce"), + ) + return VerificationResult(valid=True, issuer=issuer, claims=claims) + + except Exception: + # Catch-all: malformed JWS or unexpected parsing error. + return VerificationResult(valid=False, reason="invalid_signature") diff --git a/sdk/python/tests/test_verify.py b/sdk/python/tests/test_verify.py new file mode 100644 index 0000000..a97c6d1 --- /dev/null +++ b/sdk/python/tests/test_verify.py @@ -0,0 +1,604 @@ +""" +Tests for the Open Agent Trust Registry Python SDK — verify_attestation(). + +Mirrors sdk/typescript/src/verify.test.ts for API parity. +Covers all acceptance criteria from Issue #15: + - valid attestation + - expired attestation + - unknown issuer + - tampered manifest / invalid signature +""" + +from __future__ import annotations + +import base64 +import time +from datetime import datetime, timedelta, timezone +from typing import Any, Optional + +import jwt as pyjwt +import pytest +from cryptography.hazmat.primitives.asymmetric.ed25519 import ( + Ed25519PrivateKey, + Ed25519PublicKey, +) + +import sys, os +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src")) + +from open_agent_trust import ( + RegistryManifest, + RevocationList, + verify_attestation, +) +from open_agent_trust.types import ( + IssuerCapabilities, + IssuerEntry, + PublicKey, + RegistrySignature, + RevokedIssuer, + RevokedKey, +) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _b64url(raw: bytes) -> str: + """Encode raw bytes as base64url *without* padding.""" + return base64.urlsafe_b64encode(raw).rstrip(b"=").decode() + + +def _make_keypair() -> tuple[Ed25519PrivateKey, str]: + """Generate an Ed25519 keypair, return (private_key, base64url_x).""" + private_key = Ed25519PrivateKey.generate() + public_key = private_key.public_key() + raw_bytes = public_key.public_bytes_raw() # 32 bytes + return private_key, _b64url(raw_bytes) + + +def _sign_token( + private_key: Ed25519PrivateKey, + iss: str, + kid: str, + aud: str, + exp_offset_seconds: int = 3600, + nonce: Optional[str] = None, +) -> str: + """Create a signed EdDSA JWT matching the expected attestation format.""" + now = int(time.time()) + payload: dict[str, Any] = { + "sub": "agent-123", + "aud": aud, + "iat": now, + "exp": now + exp_offset_seconds, + "scope": ["read"], + "constraints": {"max": 10}, + "user_pseudonym": "user-xyz", + "runtime_version": "1.0", + } + if nonce is not None: + payload["nonce"] = nonce + + return pyjwt.encode( + payload, + private_key, + algorithm="EdDSA", + headers={"kid": kid, "iss": iss, "typ": "agent-attestation+jwt"}, + ) + + +def _make_public_key_entry( + kid: str, + x: str, + status: str = "active", + expires_delta_days: int = 1, + deprecated_at: Optional[str] = None, + issued_delta_days: int = -1, +) -> PublicKey: + now = datetime.now(timezone.utc) + return PublicKey( + kid=kid, + algorithm="Ed25519", + public_key=x, + status=status, # type: ignore[arg-type] + issued_at=(now + timedelta(days=issued_delta_days)).isoformat(), + expires_at=(now + timedelta(days=expires_delta_days)).isoformat(), + deprecated_at=deprecated_at, + revoked_at=None, + ) + + +def _base_capabilities() -> IssuerCapabilities: + return IssuerCapabilities( + supervision_model="none", + audit_logging=False, + immutable_audit=False, + attestation_format="jwt", + max_attestation_ttl_seconds=3600, + capabilities_verified=False, + ) + + +def _make_issuer( + issuer_id: str, + status: str, + public_keys: list[PublicKey], +) -> IssuerEntry: + now = datetime.now(timezone.utc).isoformat() + return IssuerEntry( + issuer_id=issuer_id, + display_name=f"{issuer_id} Display", + website="https://example.com", + security_contact="sec@example.com", + status=status, # type: ignore[arg-type] + added_at=now, + last_verified=now, + public_keys=public_keys, + capabilities=_base_capabilities(), + ) + + +def _empty_revocations() -> RevocationList: + now = datetime.now(timezone.utc) + sig = RegistrySignature(algorithm="Ed25519", kid="root", value="placeholder") + return RevocationList( + schema_version="1.0.0", + generated_at=now.isoformat(), + expires_at=(now + timedelta(days=1)).isoformat(), + revoked_keys=[], + revoked_issuers=[], + signature=sig, + ) + + +def _make_manifest(entries: list[IssuerEntry]) -> RegistryManifest: + now = datetime.now(timezone.utc) + sig = RegistrySignature(algorithm="Ed25519", kid="root", value="placeholder") + return RegistryManifest( + schema_version="1.0.0", + registry_id="open-trust-registry", + generated_at=now.isoformat(), + expires_at=(now + timedelta(days=1)).isoformat(), + entries=entries, + signature=sig, + ) + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +@pytest.fixture(scope="module") +def keypairs(): + """Generate three Ed25519 keypairs: valid, revoked, expired.""" + valid_priv, valid_x = _make_keypair() + revoked_priv, revoked_x = _make_keypair() + expired_priv, expired_x = _make_keypair() + return { + "valid": (valid_priv, valid_x), + "revoked": (revoked_priv, revoked_x), + "expired": (expired_priv, expired_x), + } + + +@pytest.fixture(scope="module") +def manifest(keypairs): + valid_priv, valid_x = keypairs["valid"] + revoked_priv, revoked_x = keypairs["revoked"] + expired_priv, expired_x = keypairs["expired"] + + now = datetime.now(timezone.utc) + + valid_issuer = _make_issuer( + "valid-issuer", + "active", + [ + _make_public_key_entry("valid-key-1", valid_x), + # expired registry key (expires_at yesterday) + PublicKey( + kid="expired-registry-key-1", + algorithm="Ed25519", + public_key=expired_x, + status="active", + issued_at=(now - timedelta(days=2)).isoformat(), + expires_at=(now - timedelta(days=1)).isoformat(), # expired + deprecated_at=None, + revoked_at=None, + ), + ], + ) + + revoked_issuer = _make_issuer( + "revoked-issuer", + "revoked", + [ + _make_public_key_entry("revoked-key-1", revoked_x, status="revoked"), + ], + ) + + return _make_manifest([valid_issuer, revoked_issuer]) + + +@pytest.fixture(scope="module") +def revocations(): + now = datetime.now(timezone.utc) + sig = RegistrySignature(algorithm="Ed25519", kid="root", value="placeholder") + return RevocationList( + schema_version="1.0.0", + generated_at=now.isoformat(), + expires_at=(now + timedelta(days=1)).isoformat(), + revoked_keys=[], + revoked_issuers=[ + RevokedIssuer( + issuer_id="revoked-issuer", + revoked_at=now.isoformat(), + reason="policy_violation", + ) + ], + signature=sig, + ) + + +# --------------------------------------------------------------------------- +# Tests — happy path +# --------------------------------------------------------------------------- + +def test_verifies_valid_token(keypairs, manifest, revocations): + priv, _ = keypairs["valid"] + token = _sign_token(priv, "valid-issuer", "valid-key-1", "https://api.service.com") + result = verify_attestation(token, manifest, revocations, "https://api.service.com") + + assert result.valid is True + assert result.issuer is not None + assert result.issuer.issuer_id == "valid-issuer" + assert result.claims is not None + assert result.claims.sub == "agent-123" + + +def test_verifies_valid_token_with_nonce(keypairs, manifest, revocations): + priv, _ = keypairs["valid"] + token = _sign_token( + priv, "valid-issuer", "valid-key-1", "https://api.service.com", nonce="nonce-abc" + ) + result = verify_attestation( + token, manifest, revocations, "https://api.service.com", expected_nonce="nonce-abc" + ) + + assert result.valid is True + + +# --------------------------------------------------------------------------- +# Tests — issuer rejection +# --------------------------------------------------------------------------- + +def test_rejects_unknown_issuer(keypairs, manifest, revocations): + priv, _ = keypairs["valid"] + token = _sign_token(priv, "fake-issuer", "valid-key-1", "https://api.service.com") + result = verify_attestation(token, manifest, revocations, "https://api.service.com") + + assert result.valid is False + assert result.reason == "unknown_issuer" + + +def test_rejects_revoked_issuer(keypairs, manifest, revocations): + priv, _ = keypairs["revoked"] + token = _sign_token(priv, "revoked-issuer", "revoked-key-1", "https://api.service.com") + result = verify_attestation(token, manifest, revocations, "https://api.service.com") + + assert result.valid is False + assert result.reason == "revoked_issuer" + + +def test_rejects_suspended_issuer(keypairs, revocations): + priv, x = _make_keypair() + suspended_issuer = _make_issuer( + "suspended-issuer", + "suspended", + [_make_public_key_entry("key-1", x)], + ) + manifest = _make_manifest([suspended_issuer]) + token = _sign_token(priv, "suspended-issuer", "key-1", "https://api.service.com") + result = verify_attestation(token, manifest, _empty_revocations(), "https://api.service.com") + + assert result.valid is False + assert result.reason == "suspended_issuer" + + +# --------------------------------------------------------------------------- +# Tests — key rejection +# --------------------------------------------------------------------------- + +def test_rejects_unknown_key(keypairs, manifest, revocations): + priv, _ = keypairs["valid"] + token = _sign_token(priv, "valid-issuer", "fake-key", "https://api.service.com") + result = verify_attestation(token, manifest, revocations, "https://api.service.com") + + assert result.valid is False + assert result.reason == "unknown_key" + + +def test_rejects_revoked_key_via_manifest(keypairs, revocations): + """Key with status='revoked' in the manifest entry itself (Step 8).""" + priv, x = _make_keypair() + issuer = _make_issuer( + "issuer-with-revoked-key", + "active", + [_make_public_key_entry("revoked-k", x, status="revoked")], + ) + manifest = _make_manifest([issuer]) + token = _sign_token(priv, "issuer-with-revoked-key", "revoked-k", "https://api.service.com") + result = verify_attestation(token, manifest, _empty_revocations(), "https://api.service.com") + + assert result.valid is False + assert result.reason == "revoked_key" + + +def test_rejects_revoked_key_via_revocations_list(keypairs): + """Key on the fast-path revocations list (Step 2 fast reject).""" + priv, x = _make_keypair() + issuer = _make_issuer( + "valid-issuer-2", + "active", + [_make_public_key_entry("my-key", x)], + ) + manifest = _make_manifest([issuer]) + + now = datetime.now(timezone.utc) + sig = RegistrySignature(algorithm="Ed25519", kid="root", value="placeholder") + revocations = RevocationList( + schema_version="1.0.0", + generated_at=now.isoformat(), + expires_at=(now + timedelta(days=1)).isoformat(), + revoked_keys=[ + RevokedKey( + issuer_id="valid-issuer-2", + kid="my-key", + revoked_at=now.isoformat(), + reason="compromise", + ) + ], + revoked_issuers=[], + signature=sig, + ) + + token = _sign_token(priv, "valid-issuer-2", "my-key", "https://api.service.com") + result = verify_attestation(token, manifest, revocations, "https://api.service.com") + + assert result.valid is False + assert result.reason == "revoked_key" + + +# --------------------------------------------------------------------------- +# Tests — signature & crypto +# --------------------------------------------------------------------------- + +def test_rejects_invalid_signature(keypairs, manifest, revocations): + """Sign with the revoked keypair but claim the valid-issuer/valid-key identity.""" + wrong_priv, _ = keypairs["revoked"] + token = _sign_token(wrong_priv, "valid-issuer", "valid-key-1", "https://api.service.com") + result = verify_attestation(token, manifest, revocations, "https://api.service.com") + + assert result.valid is False + assert result.reason == "invalid_signature" + + +def test_rejects_tampered_token(keypairs, manifest, revocations): + """Mutate one character in the payload segment — signature must fail.""" + priv, _ = keypairs["valid"] + token = _sign_token(priv, "valid-issuer", "valid-key-1", "https://api.service.com") + parts = token.split(".") + # Flip one char in the payload + flipped = parts[1][:-1] + ("A" if parts[1][-1] != "A" else "B") + tampered = ".".join([parts[0], flipped, parts[2]]) + + result = verify_attestation(tampered, manifest, revocations, "https://api.service.com") + + assert result.valid is False + assert result.reason == "invalid_signature" + + +def test_rejects_malformed_jws(manifest, revocations): + result = verify_attestation("not.a.jwt", manifest, revocations, "https://api.service.com") + assert result.valid is False + assert result.reason == "invalid_signature" + + +# --------------------------------------------------------------------------- +# Tests — time-based rejections +# --------------------------------------------------------------------------- + +def test_rejects_expired_attestation_token(keypairs, manifest, revocations): + """JWT exp is 1 hour in the past.""" + priv, _ = keypairs["valid"] + token = _sign_token( + priv, "valid-issuer", "valid-key-1", "https://api.service.com", exp_offset_seconds=-3600 + ) + result = verify_attestation(token, manifest, revocations, "https://api.service.com") + + assert result.valid is False + assert result.reason == "expired_attestation" + + +def test_rejects_expired_registry_key(keypairs, manifest, revocations): + """Registry public key has expires_at in the past (Step 10).""" + priv, _ = keypairs["expired"] + # The manifest fixture includes "expired-registry-key-1" which expired yesterday + token = _sign_token( + priv, "valid-issuer", "expired-registry-key-1", "https://api.service.com" + ) + result = verify_attestation(token, manifest, revocations, "https://api.service.com") + + assert result.valid is False + assert result.reason == "invalid_signature" + + +def test_rejects_deprecated_key_past_grace_period(): + """deprecated_at 91 days ago — beyond 90-day grace period.""" + priv, x = _make_keypair() + now = datetime.now(timezone.utc) + deprecated_at = (now - timedelta(days=91)).isoformat() + + issuer = _make_issuer( + "issuer-deprecated", + "active", + [ + PublicKey( + kid="deprecated-k", + algorithm="Ed25519", + public_key=x, + status="deprecated", + issued_at=(now - timedelta(days=180)).isoformat(), + expires_at=(now + timedelta(days=1)).isoformat(), + deprecated_at=deprecated_at, + revoked_at=None, + ) + ], + ) + manifest = _make_manifest([issuer]) + token = _sign_token(priv, "issuer-deprecated", "deprecated-k", "https://api.service.com") + result = verify_attestation(token, manifest, _empty_revocations(), "https://api.service.com") + + assert result.valid is False + assert result.reason == "grace_period_expired" + + +def test_accepts_deprecated_key_within_grace_period(): + """deprecated_at 30 days ago — still within 90-day grace period.""" + priv, x = _make_keypair() + now = datetime.now(timezone.utc) + deprecated_at = (now - timedelta(days=30)).isoformat() + + issuer = _make_issuer( + "issuer-deprecated-ok", + "active", + [ + PublicKey( + kid="deprecated-grace-k", + algorithm="Ed25519", + public_key=x, + status="deprecated", + issued_at=(now - timedelta(days=60)).isoformat(), + expires_at=(now + timedelta(days=1)).isoformat(), + deprecated_at=deprecated_at, + revoked_at=None, + ) + ], + ) + manifest = _make_manifest([issuer]) + token = _sign_token( + priv, "issuer-deprecated-ok", "deprecated-grace-k", "https://api.service.com" + ) + result = verify_attestation(token, manifest, _empty_revocations(), "https://api.service.com") + + assert result.valid is True + + +# --------------------------------------------------------------------------- +# Tests — claim validation +# --------------------------------------------------------------------------- + +def test_rejects_audience_mismatch(keypairs, manifest, revocations): + priv, _ = keypairs["valid"] + token = _sign_token(priv, "valid-issuer", "valid-key-1", "https://other-service.com") + result = verify_attestation(token, manifest, revocations, "https://api.service.com") + + assert result.valid is False + assert result.reason == "audience_mismatch" + + +def test_rejects_nonce_mismatch(keypairs, manifest, revocations): + priv, _ = keypairs["valid"] + token = _sign_token( + priv, "valid-issuer", "valid-key-1", "https://api.service.com", nonce="nonce-123" + ) + result = verify_attestation( + token, manifest, revocations, "https://api.service.com", expected_nonce="nonce-999" + ) + + assert result.valid is False + assert result.reason == "nonce_mismatch" + + +def test_no_nonce_check_when_not_expected(keypairs, manifest, revocations): + """Token carries a nonce but service doesn't require one — should still pass.""" + priv, _ = keypairs["valid"] + token = _sign_token( + priv, "valid-issuer", "valid-key-1", "https://api.service.com", nonce="nonce-xyz" + ) + result = verify_attestation(token, manifest, revocations, "https://api.service.com") + + assert result.valid is True + + +# --------------------------------------------------------------------------- +# Tests — claims extraction +# --------------------------------------------------------------------------- + +def test_claims_are_populated_on_success(keypairs, manifest, revocations): + priv, _ = keypairs["valid"] + token = _sign_token(priv, "valid-issuer", "valid-key-1", "https://api.service.com") + result = verify_attestation(token, manifest, revocations, "https://api.service.com") + + assert result.valid is True + assert result.claims is not None + assert result.claims.sub == "agent-123" + assert result.claims.aud == "https://api.service.com" + assert result.claims.scope == ["read"] + assert result.claims.constraints == {"max": 10} + assert result.claims.user_pseudonym == "user-xyz" + assert result.claims.runtime_version == "1.0" + + +# --------------------------------------------------------------------------- +# Tests — RegistryManifest.from_dict round-trip +# --------------------------------------------------------------------------- + +def test_manifest_from_dict(): + """RegistryManifest.from_dict parses a raw JSON-like dict correctly.""" + now = datetime.now(timezone.utc) + raw = { + "schema_version": "1.0.0", + "registry_id": "test", + "generated_at": now.isoformat(), + "expires_at": (now + timedelta(days=1)).isoformat(), + "signature": {"algorithm": "Ed25519", "kid": "root", "value": "sig"}, + "entries": [ + { + "issuer_id": "test-issuer", + "display_name": "Test", + "website": "https://test.com", + "security_contact": "sec@test.com", + "status": "active", + "added_at": now.isoformat(), + "last_verified": now.isoformat(), + "capabilities": { + "supervision_model": "none", + "audit_logging": False, + "immutable_audit": False, + "attestation_format": "jwt", + "max_attestation_ttl_seconds": 3600, + "capabilities_verified": True, + }, + "public_keys": [ + { + "kid": "k1", + "algorithm": "Ed25519", + "public_key": "abc123", + "status": "active", + "issued_at": now.isoformat(), + "expires_at": (now + timedelta(days=1)).isoformat(), + "deprecated_at": None, + "revoked_at": None, + } + ], + } + ], + } + m = RegistryManifest.from_dict(raw) + assert m.registry_id == "test" + assert len(m.entries) == 1 + assert m.entries[0].issuer_id == "test-issuer" + assert m.entries[0].capabilities.capabilities_verified is True + assert m.entries[0].public_keys[0].kid == "k1"