From 8604d6c52dba9b09ca322426c2c581cc957f5f56 Mon Sep 17 00:00:00 2001 From: Ancient Runner <7602667+wallscaler@users.noreply.github.com> Date: Thu, 25 Jun 2026 17:50:01 -0400 Subject: [PATCH 1/4] Add scalable SAT snapshot miner path --- deploy/DISTRIBUTION_ARCHITECTURE_PLAN.md | 19 ++ publisher_verify.py | 46 +++++ scaffold/publisher/app.py | 242 ++++++++++++++++++++++- 3 files changed, 306 insertions(+), 1 deletion(-) diff --git a/deploy/DISTRIBUTION_ARCHITECTURE_PLAN.md b/deploy/DISTRIBUTION_ARCHITECTURE_PLAN.md index 3095951f9..cf53138da 100644 --- a/deploy/DISTRIBUTION_ARCHITECTURE_PLAN.md +++ b/deploy/DISTRIBUTION_ARCHITECTURE_PLAN.md @@ -703,3 +703,22 @@ Acceptance: This slice moves Cathedral away from hot database reads without changing SAT scoring or miner submit semantics. + +## Current Compatibility Slice + +The first mergeable slice keeps existing miner and validator URLs working while +adding the scalable read contract: + +- `GET /sat/latest.json` returns a signed latest pointer with sequence, hashes, + sizes, and artifact URLs. +- `GET /sat/sequences/{sequence}/board.json` returns the current board snapshot + for the active sequence. +- `GET /sat/sequences/{sequence}/weights.json` returns the current signed + weights snapshot for the active sequence. +- `GET /sat/events` streams SSE hints containing only sequence and latest URL. +- `GET /v1/agents/receipts/{receipt_id}` returns durable submit status without + scraping recent activity. + +This is deliberately a transition layer: it does not yet publish historical +immutable object storage, CNF artifacts, transactional outbox rows, or async +verification workers. diff --git a/publisher_verify.py b/publisher_verify.py index 30d15f1ae..374be8290 100644 --- a/publisher_verify.py +++ b/publisher_verify.py @@ -250,6 +250,40 @@ def ck(name: str, cond: bool) -> None: headers={"If-None-Match": bc.headers.get("etag", "")}, ) ck("challenge-broadcast supports ETag 304", bc_304.status_code == 304) + latest = client.get("/sat/latest.json") + latest_json = latest.json() + latest_etag = latest.headers.get("etag", "") + latest_seq = str(latest_json.get("sequence")) + ck("sat latest pointer exposes signed snapshot artifacts", + latest.status_code == 200 + and latest_json.get("schema") == "cathedral.sat.latest.v1" + and latest_json.get("signature") + and latest_json.get("artifacts", {}).get("board", {}).get("url") + == f"/sat/sequences/{latest_seq}/board.json" + and latest_json.get("artifacts", {}).get("weights", {}).get("url") + == f"/sat/sequences/{latest_seq}/weights.json" + and latest_json.get("artifacts", {}).get("board", {}).get("hash", "").startswith("sha256:")) + latest_304 = client.get("/sat/latest.json", headers={"If-None-Match": latest_etag}) + ck("sat latest pointer supports ETag 304", latest_304.status_code == 304) + sat_board = client.get(f"/sat/sequences/{latest_seq}/board.json") + ck("sat board snapshot matches active-challenges", + sat_board.status_code == 200 + and sat_board.json()["items"][0]["challenge_id"] == ac["items"][0]["challenge_id"] + and sat_board.headers.get("x-cathedral-sequence") == latest_seq) + sat_weights = client.get(f"/sat/sequences/{latest_seq}/weights.json") + ck("sat weights snapshot preserves signed vector shape", + sat_weights.status_code == 200 + and sat_weights.json().get("signature") + and isinstance(sat_weights.json().get("weights"), list) + and sat_weights.headers.get("x-cathedral-sequence") == latest_seq) + stale_snapshot = client.get("/sat/sequences/stale-test-sequence/board.json") + ck("sat stale sequence fails closed", stale_snapshot.status_code == 404) + sat_events = client.get("/sat/events", params={"once": "true"}) + ck("sat events emits latest pointer hint only", + sat_events.status_code == 200 + and "event: cathedral.sat.snapshot" in sat_events.text + and '"latest_url":"/sat/latest.json"' in sat_events.text + and '"sequence":"' in sat_events.text) old_rpm = os.environ.get("CATHEDRAL_RATELIMIT_RPM") os.environ["CATHEDRAL_RATELIMIT_RPM"] = "1" try: @@ -730,6 +764,18 @@ def _offer_sig(body: dict, *, when: str): ck("first solve gets open-window rank 1", resp.json().get("solve_rank") == 1) ck("submit row emits base weighted_score 1.0", resp.json().get("weighted_score") == 1.0) + receipt = client.get(f"/v1/agents/receipts/{resp.json().get('id')}") + receipt_json = receipt.json() if receipt.status_code == 200 else {} + ck("submit receipt endpoint returns durable accepted status", + receipt.status_code == 200 + and receipt_json.get("schema") == "cathedral.submit_receipt.v1" + and receipt_json.get("challenge_id") == "sat-e2e-1" + and receipt_json.get("miner_hotkey") == miner.ss58_address + and receipt_json.get("status") == "ranked" + and receipt.headers.get("cache-control") == "no-store") + missing_receipt = client.get("/v1/agents/receipts/not-a-real-receipt") + ck("submit receipt endpoint fails closed for missing IDs", + missing_receipt.status_code == 404) explain = client.get( "/v1/leaderboard/explain", params={"miner_hotkey": miner.ss58_address}, diff --git a/scaffold/publisher/app.py b/scaffold/publisher/app.py index 99405386f..56b9d5434 100644 --- a/scaffold/publisher/app.py +++ b/scaffold/publisher/app.py @@ -19,6 +19,8 @@ """ from __future__ import annotations +import asyncio +import base64 import hashlib import hmac import json @@ -30,7 +32,8 @@ from typing import Any from fastapi import Depends, FastAPI, Form, Header, HTTPException, Query, Request, Response -from fastapi.responses import JSONResponse, PlainTextResponse +from fastapi.responses import JSONResponse, PlainTextResponse, StreamingResponse +from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey from .. import wire from ..contract import GenerateCtx @@ -438,6 +441,101 @@ def _serve_board_snapshot(request: Request): return Response(status_code=304, headers=headers) return JSONResponse(payload, headers=headers) + def _snapshot_bytes(payload: dict[str, Any]) -> bytes: + return json.dumps( + payload, sort_keys=True, separators=(",", ":"), default=str + ).encode("utf-8") + + def _snapshot_hash(payload: dict[str, Any]) -> str: + return "sha256:" + hashlib.sha256(_snapshot_bytes(payload)).hexdigest() + + def _snapshot_etag(payload: dict[str, Any]) -> str: + return '"' + hashlib.sha256(_snapshot_bytes(payload)).hexdigest() + '"' + + def _sign_latest_pointer(payload: dict[str, Any]) -> dict[str, Any]: + signed = dict(payload) + signed["key_id"] = os.environ.get( + weights_mod.KEY_ID_ENV, "cathedral-weight-policy") + signing_key = os.environ.get(weights_mod.SIGNING_KEY_ENV, "").strip() or key_hex + sk = Ed25519PrivateKey.from_private_bytes(bytes.fromhex(signing_key.strip())) + signed["signature"] = base64.b64encode( + sk.sign(weights_mod.canonical_bytes(signed)) + ).decode() + return signed + + def _sat_snapshot_bundle() -> tuple[dict[str, Any], dict[str, Any], dict[str, Any], str]: + board_payload, board_etag = board_cache.get() + weight_key = os.environ.get(weights_mod.SIGNING_KEY_ENV, "").strip() or key_hex + weights_payload = weights_mod.current_vector(store, signing_key_hex=weight_key) + board_hash = _snapshot_hash(board_payload) + weights_hash = _snapshot_hash(weights_payload) + policy_version = str(weights_payload.get("policy_version") or int(time.time() * 1000)) + sequence_digest = hashlib.sha256( + f"{policy_version}:{board_hash}:{weights_hash}".encode("utf-8") + ).hexdigest()[:12] + sequence = f"{policy_version}-{sequence_digest}" + created_at = str(weights_payload.get("generated_at") or _now_iso_ms()) + board_url = f"/sat/sequences/{sequence}/board.json" + weights_url = f"/sat/sequences/{sequence}/weights.json" + pointer = { + "schema": "cathedral.sat.latest.v1", + "lane": "sat", + "sequence": sequence, + "created_at": created_at, + "publisher_generation_id": os.environ.get( + "CATHEDRAL_PUBLISHER_GENERATION_ID", "default"), + "storage": "in_process_current_snapshot", + "trust_root": "signed_latest_pointer_and_artifact_hashes", + "artifacts": { + "board": { + "url": board_url, + "content_type": "application/json", + "hash": board_hash, + "size_bytes": len(_snapshot_bytes(board_payload)), + "etag": board_etag, + }, + "weights": { + "url": weights_url, + "content_type": "application/json", + "hash": weights_hash, + "size_bytes": len(_snapshot_bytes(weights_payload)), + "signature": "embedded", + "generated_at": weights_payload.get("generated_at"), + "expires_at": weights_payload.get("expires_at"), + }, + }, + "miner_paths": { + "latest": "/sat/latest.json", + "events": "/sat/events", + "public_board": board_url, + "public_compat": "/v1/synthetic-boolean/active-challenges", + "private_assignments": "/v1/synthetic-boolean/per-miner/challenges", + "private_cnf": "/v1/synthetic-boolean/per-miner/cnf", + "submit": "/v1/agents/submit", + "receipt_status": "/v1/agents/receipts/{receipt_id}", + }, + "compatibility": { + "legacy_read_endpoints_kept": True, + "legacy_validator_weights_kept": True, + "historical_sequence_storage": "not_yet_published", + }, + } + signed_pointer = _sign_latest_pointer(pointer) + return signed_pointer, board_payload, weights_payload, _snapshot_etag(signed_pointer) + + def _sat_snapshot_headers(etag: str, sequence: str, *, immutable: bool = False) -> dict[str, str]: + cache_control = ( + "public, max-age=31536000, immutable" + if immutable + else "public, max-age=5, must-revalidate" + ) + return { + "Cache-Control": cache_control, + "ETag": etag, + "X-Cathedral-Sequence": sequence, + "Access-Control-Allow-Origin": "*", + } + top_cache = top_cache_mod.TopCache() if _role_runs_read_background(service_role): top_cache.start(store) @@ -1894,6 +1992,97 @@ async def challenge_broadcast(request: Request): # broadcast rather than a per-request active-set query. return _serve_board_snapshot(request) + @app.get("/sat/latest.json") + async def sat_latest(request: Request): + latest, _board, _weights, etag = _sat_snapshot_bundle() + headers = _sat_snapshot_headers(etag, str(latest["sequence"])) + inm = request.headers.get("if-none-match") + if inm and etag in [t.strip() for t in inm.split(",")]: + return Response(status_code=304, headers=headers) + return JSONResponse(latest, headers=headers) + + @app.get("/sat/sequences/{sequence}/board.json") + async def sat_sequence_board(sequence: str, request: Request): + latest, board_payload, _weights, _latest_etag = _sat_snapshot_bundle() + current_sequence = str(latest["sequence"]) + if sequence != current_sequence: + raise HTTPException( + 404, + { + "detail": "snapshot_sequence_not_available", + "current_sequence": current_sequence, + }, + ) + etag = _snapshot_etag(board_payload) + headers = _sat_snapshot_headers(etag, sequence, immutable=True) + inm = request.headers.get("if-none-match") + if inm and etag in [t.strip() for t in inm.split(",")]: + return Response(status_code=304, headers=headers) + return JSONResponse(board_payload, headers=headers) + + @app.get("/sat/sequences/{sequence}/weights.json") + async def sat_sequence_weights(sequence: str, request: Request): + latest, _board, weights_payload, _latest_etag = _sat_snapshot_bundle() + current_sequence = str(latest["sequence"]) + if sequence != current_sequence: + raise HTTPException( + 404, + { + "detail": "snapshot_sequence_not_available", + "current_sequence": current_sequence, + }, + ) + etag = _snapshot_etag(weights_payload) + headers = _sat_snapshot_headers(etag, sequence, immutable=True) + inm = request.headers.get("if-none-match") + if inm and etag in [t.strip() for t in inm.split(",")]: + return Response(status_code=304, headers=headers) + return JSONResponse(weights_payload, headers=headers) + + @app.get("/sat/events") + async def sat_events(request: Request, once: bool = Query(False)): + heartbeat_secs = max(1.0, _env_float("CATHEDRAL_SSE_HEARTBEAT_SECS", 30.0)) + + def _snapshot_event_frame(latest: dict[str, Any]) -> str: + event = { + "kind": "cathedral.sat.snapshot.ready", + "sequence": latest["sequence"], + "lane": "sat", + "type": "snapshot", + "latest_url": "/sat/latest.json", + "artifact_hash": _snapshot_hash(latest), + } + return ( + "retry: 30000\n" + f"id: {latest['sequence']}\n" + "event: cathedral.sat.snapshot\n" + f"data: {json.dumps(event, sort_keys=True, separators=(',', ':'))}\n\n" + ) + + async def _stream(): + last_sequence = request.headers.get("last-event-id", "") + while True: + latest, _board, _weights, _latest_etag = _sat_snapshot_bundle() + sequence = str(latest["sequence"]) + if sequence != last_sequence: + yield _snapshot_event_frame(latest) + last_sequence = sequence + else: + yield ": keepalive\n\n" + if once or await request.is_disconnected(): + return + await asyncio.sleep(heartbeat_secs) + + return StreamingResponse( + _stream(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-store", + "X-Accel-Buffering": "no", + "Access-Control-Allow-Origin": "*", + }, + ) + @app.get("/v1/synthetic-boolean/current-challenge") async def current_challenge(tier: int | None = Query(None), difficulty: str | None = Query(None)): if tier is not None and tier < 0: @@ -3601,6 +3790,57 @@ def _accept(conn): "solve_rank": rank, "attestation_status": "pending", } + @app.get("/v1/agents/receipts/{receipt_id}") + async def agent_receipt_status(receipt_id: str): + rows_ = store.query( + "SELECT id, miner_hotkey, sat_challenge_id, status, rejection_reason, " + "current_score, seq_no, submitted_at " + "FROM agent_submissions WHERE id=?", + (receipt_id,), + ) + if rows_: + row = rows_[0] + return JSONResponse( + { + "schema": "cathedral.submit_receipt.v1", + "receipt_id": row["id"], + "source": "agent_submissions", + "miner_hotkey": row["miner_hotkey"], + "challenge_id": row["sat_challenge_id"], + "status": row["status"], + "rejection_reason": row["rejection_reason"], + "current_score": row["current_score"], + "seq_no": row["seq_no"], + "submitted_at": row["submitted_at"], + }, + headers={"Cache-Control": "no-store"}, + ) + pm_rows = store.query( + "SELECT id, challenge_id, miner_hotkey, epoch, status, rejection_reason, " + "dimacs_solution_sha256, submitted_at, recorded_at_iso " + "FROM per_miner_attempts WHERE id=?", + (receipt_id,), + ) + if pm_rows: + row = pm_rows[0] + return JSONResponse( + { + "schema": "cathedral.submit_receipt.v1", + "receipt_id": row["id"], + "source": "per_miner_attempts", + "miner_hotkey": row["miner_hotkey"], + "challenge_id": row["challenge_id"], + "status": row["status"], + "rejection_reason": row["rejection_reason"], + "epoch": row["epoch"], + "dimacs_solution_sha256": row["dimacs_solution_sha256"], + "submitted_at": row["submitted_at"], + "recorded_at": row["recorded_at_iso"], + }, + headers={"Cache-Control": "no-store"}, + ) + raise HTTPException(404, "receipt_not_found") + # ---- M3: Lane S registry ---------------------------------------------- @app.post("/v1/arena/solvers") def arena_register_solver( From 18f2cb3fc71cc5f02a1d745110c4091fca31eca7 Mon Sep 17 00:00:00 2001 From: Ancient Runner <7602667+wallscaler@users.noreply.github.com> Date: Thu, 25 Jun 2026 19:26:57 -0400 Subject: [PATCH 2/4] Allow snapshot routes in split service roles --- publisher_verify.py | 12 ++++++++++++ scaffold/publisher/app.py | 4 ++++ 2 files changed, 16 insertions(+) diff --git a/publisher_verify.py b/publisher_verify.py index 374be8290..ab853c833 100644 --- a/publisher_verify.py +++ b/publisher_verify.py @@ -322,6 +322,14 @@ def ck(name: str, cond: bool) -> None: read_submit.status_code == 404 and read_submit.text == "route_not_served_by_read_role" and read_submit.headers.get("x-cathedral-service-role") == "read") + read_latest = role_client.get("/sat/latest.json") + read_seq = str(read_latest.json().get("sequence")) if read_latest.status_code == 200 else "" + read_snapshot = role_client.get(f"/sat/sequences/{read_seq}/board.json") + read_events = role_client.get("/sat/events", params={"once": "true"}) + ck("read role serves scalable SAT snapshot endpoints", + read_latest.status_code == 200 + and read_snapshot.status_code == 200 + and read_events.status_code == 200) os.environ["CATHEDRAL_SERVICE_ROLE"] = "submit" os.environ["CATHEDRAL_REFILL_ENABLED"] = "false" @@ -339,6 +347,10 @@ def ck(name: str, cond: bool) -> None: submit_cnf = submit_role_client.get("/v1/synthetic-boolean/active-cnf") ck("submit role allows miner CNF route to reach auth validation", submit_cnf.status_code == 422) + submit_receipt = submit_role_client.get("/v1/agents/receipts/not-real") + ck("submit role serves receipt status route", + submit_receipt.status_code == 404 + and submit_receipt.headers.get("x-cathedral-service-role") is None) os.environ["CATHEDRAL_SERVICE_ROLE"] = "worker" worker_role_app = build_app(database_path=":memory:", signing_key_hex=key_hex, diff --git a/scaffold/publisher/app.py b/scaffold/publisher/app.py index 56b9d5434..8ffdf6710 100644 --- a/scaffold/publisher/app.py +++ b/scaffold/publisher/app.py @@ -711,6 +711,8 @@ class _ServiceRoleGuardMiddleware: "/.well-known/cathedral-jwks.json", } _READ_GET_PATHS = { + "/sat/latest.json", + "/sat/events", "/v1/synthetic-boolean/active-challenges", "/v1/synthetic-boolean/challenge-broadcast", "/v1/synthetic-boolean/current-challenge", @@ -722,6 +724,7 @@ class _ServiceRoleGuardMiddleware: "/v1/leaderboard/explain", } _READ_GET_PREFIXES = { + "/sat/sequences/", "/v1/audit-scanner/", } _SUBMIT_GET_PATHS = { @@ -731,6 +734,7 @@ class _ServiceRoleGuardMiddleware: } _SUBMIT_GET_PREFIXES = { "/v1/challenges/", + "/v1/agents/receipts/", } _SUBMIT_POST_PATHS = { "/v1/agents/submit", From 90325ffd777c6cf248214242bcc7b29cd5c5946d Mon Sep 17 00:00:00 2001 From: ancientrunner Date: Thu, 25 Jun 2026 22:09:38 -0400 Subject: [PATCH 3/4] Harden SAT snapshot artifact contract --- deploy/DISTRIBUTION_ARCHITECTURE_PLAN.md | 9 +- publisher_verify.py | 63 ++++++++- scaffold/publisher/app.py | 158 ++++++++++++++++++++--- scaffold/publisher/store.py | 29 +++++ 4 files changed, 239 insertions(+), 20 deletions(-) diff --git a/deploy/DISTRIBUTION_ARCHITECTURE_PLAN.md b/deploy/DISTRIBUTION_ARCHITECTURE_PLAN.md index cf53138da..b12cf1ee4 100644 --- a/deploy/DISTRIBUTION_ARCHITECTURE_PLAN.md +++ b/deploy/DISTRIBUTION_ARCHITECTURE_PLAN.md @@ -712,13 +712,16 @@ adding the scalable read contract: - `GET /sat/latest.json` returns a signed latest pointer with sequence, hashes, sizes, and artifact URLs. - `GET /sat/sequences/{sequence}/board.json` returns the current board snapshot - for the active sequence. + for a recent sequence persisted in the shared publisher database. - `GET /sat/sequences/{sequence}/weights.json` returns the current signed - weights snapshot for the active sequence. + weights snapshot for a recent sequence persisted in the shared publisher + database. - `GET /sat/events` streams SSE hints containing only sequence and latest URL. - `GET /v1/agents/receipts/{receipt_id}` returns durable submit status without scraping recent activity. This is deliberately a transition layer: it does not yet publish historical immutable object storage, CNF artifacts, transactional outbox rows, or async -verification workers. +verification workers. Sequence artifact responses serve byte-exact, +hash-verifiable JSON from a bounded shared-database cache, but they use bounded +cache headers rather than claiming permanent origin availability. diff --git a/publisher_verify.py b/publisher_verify.py index ab853c833..ba0a1925c 100644 --- a/publisher_verify.py +++ b/publisher_verify.py @@ -269,13 +269,26 @@ def ck(name: str, cond: bool) -> None: ck("sat board snapshot matches active-challenges", sat_board.status_code == 200 and sat_board.json()["items"][0]["challenge_id"] == ac["items"][0]["challenge_id"] - and sat_board.headers.get("x-cathedral-sequence") == latest_seq) + and sat_board.headers.get("x-cathedral-sequence") == latest_seq + and sat_board.headers.get("etag") == latest_json["artifacts"]["board"]["etag"]) + ck("sat board snapshot bytes match signed pointer hash and size", + "sha256:" + hashlib.sha256(sat_board.content).hexdigest() + == latest_json["artifacts"]["board"]["hash"] + and len(sat_board.content) == latest_json["artifacts"]["board"]["size_bytes"]) sat_weights = client.get(f"/sat/sequences/{latest_seq}/weights.json") ck("sat weights snapshot preserves signed vector shape", sat_weights.status_code == 200 and sat_weights.json().get("signature") and isinstance(sat_weights.json().get("weights"), list) - and sat_weights.headers.get("x-cathedral-sequence") == latest_seq) + and sat_weights.headers.get("x-cathedral-sequence") == latest_seq + and sat_weights.headers.get("etag") == latest_json["artifacts"]["weights"]["etag"]) + ck("sat weights snapshot bytes match signed pointer hash and size", + "sha256:" + hashlib.sha256(sat_weights.content).hexdigest() + == latest_json["artifacts"]["weights"]["hash"] + and len(sat_weights.content) == latest_json["artifacts"]["weights"]["size_bytes"]) + ck("sat sequence artifacts do not claim durable immutable origin storage yet", + "immutable" not in (sat_board.headers.get("cache-control") or "").lower() + and "31536000" not in (sat_board.headers.get("cache-control") or "")) stale_snapshot = client.get("/sat/sequences/stale-test-sequence/board.json") ck("sat stale sequence fails closed", stale_snapshot.status_code == 404) sat_events = client.get("/sat/events", params={"once": "true"}) @@ -284,6 +297,52 @@ def ck(name: str, cond: bool) -> None: and "event: cathedral.sat.snapshot" in sat_events.text and '"latest_url":"/sat/latest.json"' in sat_events.text and '"sequence":"' in sat_events.text) + import tempfile as _snapshot_tempfile + snapshot_db_path = "" + try: + with _snapshot_tempfile.NamedTemporaryFile(suffix=".sqlite", delete=False) as _snapshot_db: + snapshot_db_path = _snapshot_db.name + snapshot_app_a = build_app( + database_path=snapshot_db_path, + signing_key_hex=key_hex, + submit_min_interval_secs=0, + ) + seed_challenge( + snapshot_app_a.state.store, + challenge_id="sat-snapshot-cross-app", + tier=1, + cnf_text=cnf_e2e, + ) + snapshot_app_b = build_app( + database_path=snapshot_db_path, + signing_key_hex=key_hex, + submit_min_interval_secs=0, + ) + with TestClient(snapshot_app_a) as snapshot_client_a, TestClient(snapshot_app_b) as snapshot_client_b: + cross_latest = snapshot_client_a.get("/sat/latest.json") + cross_latest_json = cross_latest.json() + cross_seq = str(cross_latest_json.get("sequence")) + cross_board = snapshot_client_b.get( + cross_latest_json["artifacts"]["board"]["url"]) + cross_weights = snapshot_client_b.get( + cross_latest_json["artifacts"]["weights"]["url"]) + ck("sat sequence artifacts resolve across split read replicas", + cross_latest.status_code == 200 + and cross_board.status_code == 200 + and cross_weights.status_code == 200 + and cross_board.headers.get("x-cathedral-sequence") == cross_seq + and cross_weights.headers.get("x-cathedral-sequence") == cross_seq) + ck("cross-replica sat artifact bytes match signed pointer hashes", + "sha256:" + hashlib.sha256(cross_board.content).hexdigest() + == cross_latest_json["artifacts"]["board"]["hash"] + and "sha256:" + hashlib.sha256(cross_weights.content).hexdigest() + == cross_latest_json["artifacts"]["weights"]["hash"]) + finally: + try: + if snapshot_db_path: + os.unlink(snapshot_db_path) + except Exception: + pass old_rpm = os.environ.get("CATHEDRAL_RATELIMIT_RPM") os.environ["CATHEDRAL_RATELIMIT_RPM"] = "1" try: diff --git a/scaffold/publisher/app.py b/scaffold/publisher/app.py index 8ffdf6710..a338726e6 100644 --- a/scaffold/publisher/app.py +++ b/scaffold/publisher/app.py @@ -169,7 +169,13 @@ def _refresh(self, key: Any, builder) -> None: def _now_iso_ms() -> str: - dt = datetime.now(timezone.utc) + return _iso_ms(datetime.now(timezone.utc)) + + +def _iso_ms(dt: datetime) -> str: + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + dt = dt.astimezone(timezone.utc) return dt.strftime("%Y-%m-%dT%H:%M:%S.") + f"{dt.microsecond // 1000:03d}Z" @@ -452,6 +458,104 @@ def _snapshot_hash(payload: dict[str, Any]) -> str: def _snapshot_etag(payload: dict[str, Any]) -> str: return '"' + hashlib.sha256(_snapshot_bytes(payload)).hexdigest() + '"' + snapshot_lock = threading.Lock() + snapshot_ring_size = max(1, _env_int("CATHEDRAL_SAT_SNAPSHOT_RING_SIZE", 16)) + snapshot_retention_hours = max(1.0, _env_float("CATHEDRAL_SAT_SNAPSHOT_RETENTION_HOURS", 6.0)) + snapshot_ring: dict[str, dict[str, Any]] = {} + snapshot_order: list[str] = [] + + def _snapshot_item( + latest: dict[str, Any], + board_payload: dict[str, Any], + weights_payload: dict[str, Any], + ) -> dict[str, Any]: + latest_bytes = _snapshot_bytes(latest) + board_bytes = _snapshot_bytes(board_payload) + weights_bytes = _snapshot_bytes(weights_payload) + return { + "latest": latest, + "board": board_payload, + "weights": weights_payload, + "latest_bytes": latest_bytes, + "board_bytes": board_bytes, + "weights_bytes": weights_bytes, + "latest_etag": '"' + hashlib.sha256(latest_bytes).hexdigest() + '"', + "board_etag": '"' + hashlib.sha256(board_bytes).hexdigest() + '"', + "weights_etag": '"' + hashlib.sha256(weights_bytes).hexdigest() + '"', + } + + def _snapshot_cache_local(sequence: str, item: dict[str, Any]) -> None: + if sequence not in snapshot_ring: + snapshot_order.append(sequence) + snapshot_ring[sequence] = item + while len(snapshot_order) > snapshot_ring_size: + old = snapshot_order.pop(0) + snapshot_ring.pop(old, None) + + def _snapshot_remember( + sequence: str, + latest: dict[str, Any], + board_payload: dict[str, Any], + weights_payload: dict[str, Any], + ) -> None: + item = _snapshot_item(latest, board_payload, weights_payload) + with snapshot_lock: + seen_local = sequence in snapshot_ring + _snapshot_cache_local(sequence, item) + if seen_local: + return + + cutoff = _iso_ms(datetime.now(timezone.utc) - timedelta(hours=snapshot_retention_hours)) + + def _store_snapshot(conn): + conn.execute( + "INSERT OR REPLACE INTO sat_snapshots(" + "sequence, created_at_iso, latest_json, board_json, weights_json, " + "latest_etag, board_etag, weights_etag" + ") VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + ( + sequence, + str(latest.get("created_at") or _now_iso_ms()), + item["latest_bytes"].decode("utf-8"), + item["board_bytes"].decode("utf-8"), + item["weights_bytes"].decode("utf-8"), + item["latest_etag"], + item["board_etag"], + item["weights_etag"], + ), + ) + conn.execute( + "DELETE FROM sat_snapshots WHERE created_at_iso < ?", + (cutoff,), + ) + + store.write(_store_snapshot) + + def _snapshot_lookup(sequence: str) -> dict[str, Any] | None: + with snapshot_lock: + item = snapshot_ring.get(sequence) + if item is not None: + return dict(item) + rows_ = store.query( + "SELECT latest_json, board_json, weights_json, latest_etag, " + "board_etag, weights_etag FROM sat_snapshots WHERE sequence=?", + (sequence,), + ) + if not rows_: + return None + row = rows_[0] + item = { + "latest_bytes": str(row["latest_json"]).encode("utf-8"), + "board_bytes": str(row["board_json"]).encode("utf-8"), + "weights_bytes": str(row["weights_json"]).encode("utf-8"), + "latest_etag": row["latest_etag"], + "board_etag": row["board_etag"], + "weights_etag": row["weights_etag"], + } + with snapshot_lock: + _snapshot_cache_local(sequence, item) + return dict(item) + def _sign_latest_pointer(payload: dict[str, Any]) -> dict[str, Any]: signed = dict(payload) signed["key_id"] = os.environ.get( @@ -464,11 +568,13 @@ def _sign_latest_pointer(payload: dict[str, Any]) -> dict[str, Any]: return signed def _sat_snapshot_bundle() -> tuple[dict[str, Any], dict[str, Any], dict[str, Any], str]: - board_payload, board_etag = board_cache.get() + board_payload, _board_cache_etag = board_cache.get() weight_key = os.environ.get(weights_mod.SIGNING_KEY_ENV, "").strip() or key_hex weights_payload = weights_mod.current_vector(store, signing_key_hex=weight_key) board_hash = _snapshot_hash(board_payload) weights_hash = _snapshot_hash(weights_payload) + board_etag = _snapshot_etag(board_payload) + weights_etag = _snapshot_etag(weights_payload) policy_version = str(weights_payload.get("policy_version") or int(time.time() * 1000)) sequence_digest = hashlib.sha256( f"{policy_version}:{board_hash}:{weights_hash}".encode("utf-8") @@ -484,7 +590,7 @@ def _sat_snapshot_bundle() -> tuple[dict[str, Any], dict[str, Any], dict[str, An "created_at": created_at, "publisher_generation_id": os.environ.get( "CATHEDRAL_PUBLISHER_GENERATION_ID", "default"), - "storage": "in_process_current_snapshot", + "storage": "in_process_recent_snapshot_ring", "trust_root": "signed_latest_pointer_and_artifact_hashes", "artifacts": { "board": { @@ -499,6 +605,7 @@ def _sat_snapshot_bundle() -> tuple[dict[str, Any], dict[str, Any], dict[str, An "content_type": "application/json", "hash": weights_hash, "size_bytes": len(_snapshot_bytes(weights_payload)), + "etag": weights_etag, "signature": "embedded", "generated_at": weights_payload.get("generated_at"), "expires_at": weights_payload.get("expires_at"), @@ -521,11 +628,16 @@ def _sat_snapshot_bundle() -> tuple[dict[str, Any], dict[str, Any], dict[str, An }, } signed_pointer = _sign_latest_pointer(pointer) + _snapshot_remember(sequence, signed_pointer, board_payload, weights_payload) return signed_pointer, board_payload, weights_payload, _snapshot_etag(signed_pointer) def _sat_snapshot_headers(etag: str, sequence: str, *, immutable: bool = False) -> dict[str, str]: + # Sequence URLs are content-addressed by the signed latest pointer, but + # this transition slice stores only a small in-process history. Do not + # advertise year-long immutable origin availability until snapshots are + # written to durable object storage. cache_control = ( - "public, max-age=31536000, immutable" + "public, max-age=300, stale-while-revalidate=600" if immutable else "public, max-age=5, must-revalidate" ) @@ -2003,13 +2115,18 @@ async def sat_latest(request: Request): inm = request.headers.get("if-none-match") if inm and etag in [t.strip() for t in inm.split(",")]: return Response(status_code=304, headers=headers) - return JSONResponse(latest, headers=headers) + return Response( + content=_snapshot_bytes(latest), + media_type="application/json", + headers=headers, + ) @app.get("/sat/sequences/{sequence}/board.json") async def sat_sequence_board(sequence: str, request: Request): - latest, board_payload, _weights, _latest_etag = _sat_snapshot_bundle() - current_sequence = str(latest["sequence"]) - if sequence != current_sequence: + snapshot = _snapshot_lookup(sequence) + if snapshot is None: + latest, _board_payload, _weights, _latest_etag = _sat_snapshot_bundle() + current_sequence = str(latest["sequence"]) raise HTTPException( 404, { @@ -2017,18 +2134,24 @@ async def sat_sequence_board(sequence: str, request: Request): "current_sequence": current_sequence, }, ) - etag = _snapshot_etag(board_payload) + board_bytes = snapshot["board_bytes"] + etag = str(snapshot["board_etag"]) headers = _sat_snapshot_headers(etag, sequence, immutable=True) inm = request.headers.get("if-none-match") if inm and etag in [t.strip() for t in inm.split(",")]: return Response(status_code=304, headers=headers) - return JSONResponse(board_payload, headers=headers) + return Response( + content=board_bytes, + media_type="application/json", + headers=headers, + ) @app.get("/sat/sequences/{sequence}/weights.json") async def sat_sequence_weights(sequence: str, request: Request): - latest, _board, weights_payload, _latest_etag = _sat_snapshot_bundle() - current_sequence = str(latest["sequence"]) - if sequence != current_sequence: + snapshot = _snapshot_lookup(sequence) + if snapshot is None: + latest, _board, _weights_payload, _latest_etag = _sat_snapshot_bundle() + current_sequence = str(latest["sequence"]) raise HTTPException( 404, { @@ -2036,12 +2159,17 @@ async def sat_sequence_weights(sequence: str, request: Request): "current_sequence": current_sequence, }, ) - etag = _snapshot_etag(weights_payload) + weights_bytes = snapshot["weights_bytes"] + etag = str(snapshot["weights_etag"]) headers = _sat_snapshot_headers(etag, sequence, immutable=True) inm = request.headers.get("if-none-match") if inm and etag in [t.strip() for t in inm.split(",")]: return Response(status_code=304, headers=headers) - return JSONResponse(weights_payload, headers=headers) + return Response( + content=weights_bytes, + media_type="application/json", + headers=headers, + ) @app.get("/sat/events") async def sat_events(request: Request, once: bool = Query(False)): diff --git a/scaffold/publisher/store.py b/scaffold/publisher/store.py index 694362c86..562ad65e9 100644 --- a/scaffold/publisher/store.py +++ b/scaffold/publisher/store.py @@ -389,6 +389,20 @@ CREATE INDEX IF NOT EXISTS idx_metagraph_hotkeys_fresh ON metagraph_hotkeys(network, netuid, updated_at_iso); """), + ("0029_sat_snapshots", """ + CREATE TABLE IF NOT EXISTS sat_snapshots ( + sequence TEXT PRIMARY KEY, + created_at_iso TEXT NOT NULL, + latest_json TEXT NOT NULL, + board_json TEXT NOT NULL, + weights_json TEXT NOT NULL, + latest_etag TEXT NOT NULL, + board_etag TEXT NOT NULL, + weights_etag TEXT NOT NULL + ); + CREATE INDEX IF NOT EXISTS idx_sat_snapshots_created_at + ON sat_snapshots(created_at_iso); + """), ] # Postgres DDL — the same logical schema, portable. REAL->DOUBLE PRECISION, @@ -718,6 +732,20 @@ CREATE INDEX IF NOT EXISTS idx_metagraph_hotkeys_fresh ON metagraph_hotkeys(network, netuid, updated_at_iso); """), + ("0029_sat_snapshots", """ + CREATE TABLE IF NOT EXISTS sat_snapshots ( + sequence TEXT PRIMARY KEY, + created_at_iso TEXT NOT NULL, + latest_json TEXT NOT NULL, + board_json TEXT NOT NULL, + weights_json TEXT NOT NULL, + latest_etag TEXT NOT NULL, + board_etag TEXT NOT NULL, + weights_etag TEXT NOT NULL + ); + CREATE INDEX IF NOT EXISTS idx_sat_snapshots_created_at + ON sat_snapshots(created_at_iso); + """), ] # Conflict targets for INSERT OR REPLACE / INSERT OR IGNORE upserts that name no @@ -745,6 +773,7 @@ "tee_gpu_capacity_events": "id", "attest_nonces": "nonce", "attestations": "id", + "sat_snapshots": "sequence", } From 48eb378a4515e8d8e9a120f17ac4988e39f2cdf4 Mon Sep 17 00:00:00 2001 From: ancientrunner Date: Thu, 25 Jun 2026 22:24:38 -0400 Subject: [PATCH 4/4] Keep SAT snapshot reads replica-safe --- publisher_verify.py | 81 ++++++++++++++++++++++++++++++++++++++- scaffold/publisher/app.py | 81 +++++++++++++++++++++++++++++++++------ 2 files changed, 150 insertions(+), 12 deletions(-) diff --git a/publisher_verify.py b/publisher_verify.py index ba0a1925c..73fe779b6 100644 --- a/publisher_verify.py +++ b/publisher_verify.py @@ -337,6 +337,52 @@ def ck(name: str, cond: bool) -> None: == cross_latest_json["artifacts"]["board"]["hash"] and "sha256:" + hashlib.sha256(cross_weights.content).hexdigest() == cross_latest_json["artifacts"]["weights"]["hash"]) + + snapshot_fail_app_a = build_app( + database_path=snapshot_db_path, + signing_key_hex=key_hex, + submit_min_interval_secs=0, + ) + snapshot_fail_app_b = build_app( + database_path=snapshot_db_path, + signing_key_hex=key_hex, + submit_min_interval_secs=0, + ) + snapshot_fail_app_a.state.store.write( + lambda conn: conn.execute("DELETE FROM sat_snapshots")) + original_write = snapshot_fail_app_a.state.store.write + failed_snapshot_write = {"done": False} + + class _FailOneSnapshotInsert: + def __init__(self, conn): + self._conn = conn + + def execute(self, sql, *args, **kwargs): + if ( + not failed_snapshot_write["done"] + and "INSERT OR REPLACE INTO sat_snapshots" in str(sql) + ): + failed_snapshot_write["done"] = True + raise RuntimeError("forced_sat_snapshot_write_failure") + return self._conn.execute(sql, *args, **kwargs) + + def __getattr__(self, name): + return getattr(self._conn, name) + + def _flaky_write(fn): + return original_write(lambda conn: fn(_FailOneSnapshotInsert(conn))) + + snapshot_fail_app_a.state.store.write = _flaky_write + with TestClient(snapshot_fail_app_a, raise_server_exceptions=False) as fail_client_a, TestClient(snapshot_fail_app_b) as fail_client_b: + failed_latest = fail_client_a.get("/sat/latest.json") + retry_latest = fail_client_a.get("/sat/latest.json") + retry_latest_json = retry_latest.json() + retry_board = fail_client_b.get(retry_latest_json["artifacts"]["board"]["url"]) + ck("sat snapshot write failure does not poison local cache", + failed_latest.status_code == 500 + and retry_latest.status_code == 200 + and retry_board.status_code == 200 + and failed_snapshot_write["done"]) finally: try: if snapshot_db_path: @@ -367,10 +413,31 @@ def ck(name: str, cond: bool) -> None: "CATHEDRAL_REFILL_ENABLED": os.environ.get("CATHEDRAL_REFILL_ENABLED"), } try: + role_db_path = "" + with _snapshot_tempfile.NamedTemporaryFile(suffix=".sqlite", delete=False) as _role_db: + role_db_path = _role_db.name + os.environ["CATHEDRAL_SERVICE_ROLE"] = "all" + role_seed_app = build_app(database_path=role_db_path, signing_key_hex=key_hex, + submit_min_interval_secs=0) + seed_challenge( + role_seed_app.state.store, + challenge_id="sat-snapshot-role-seed", + tier=1, + cnf_text=cnf_e2e, + ) + with TestClient(role_seed_app) as role_seed_client: + role_seed_latest = role_seed_client.get("/sat/latest.json") + ck("role test prepublishes SAT snapshot", + role_seed_latest.status_code == 200) + os.environ["CATHEDRAL_SERVICE_ROLE"] = "read" os.environ["CATHEDRAL_REFILL_ENABLED"] = "true" - role_app = build_app(database_path=":memory:", signing_key_hex=key_hex, + role_app = build_app(database_path=role_db_path, signing_key_hex=key_hex, submit_min_interval_secs=0) + before_snapshots = role_app.state.store.query( + "SELECT COUNT(*) AS count FROM sat_snapshots")[0]["count"] + before_weight_policy = role_app.state.store.query( + "SELECT COUNT(*) AS count FROM weight_policy_state")[0]["count"] with TestClient(role_app) as role_client: role_health = role_client.get("/health/live").json() ck("service role appears in health", role_health["service_role"] == "read") @@ -389,6 +456,13 @@ def ck(name: str, cond: bool) -> None: read_latest.status_code == 200 and read_snapshot.status_code == 200 and read_events.status_code == 200) + after_snapshots = role_app.state.store.query( + "SELECT COUNT(*) AS count FROM sat_snapshots")[0]["count"] + after_weight_policy = role_app.state.store.query( + "SELECT COUNT(*) AS count FROM weight_policy_state")[0]["count"] + ck("read role serves SAT snapshots without publishing", + before_snapshots == after_snapshots + and before_weight_policy == after_weight_policy) os.environ["CATHEDRAL_SERVICE_ROLE"] = "submit" os.environ["CATHEDRAL_REFILL_ENABLED"] = "false" @@ -500,6 +574,11 @@ def _seed_retention_rows(conn): os.environ.pop(_key, None) else: os.environ[_key] = _value + try: + if "role_db_path" in locals() and role_db_path: + os.unlink(role_db_path) + except Exception: + pass from bittensor_wallet import Keypair # noqa miner = Keypair.create_from_uri("//E2EMiner") diff --git a/scaffold/publisher/app.py b/scaffold/publisher/app.py index a338726e6..8668acfba 100644 --- a/scaffold/publisher/app.py +++ b/scaffold/publisher/app.py @@ -501,7 +501,6 @@ def _snapshot_remember( item = _snapshot_item(latest, board_payload, weights_payload) with snapshot_lock: seen_local = sequence in snapshot_ring - _snapshot_cache_local(sequence, item) if seen_local: return @@ -530,6 +529,8 @@ def _store_snapshot(conn): ) store.write(_store_snapshot) + with snapshot_lock: + _snapshot_cache_local(sequence, item) def _snapshot_lookup(sequence: str) -> dict[str, Any] | None: with snapshot_lock: @@ -556,6 +557,42 @@ def _snapshot_lookup(sequence: str) -> dict[str, Any] | None: _snapshot_cache_local(sequence, item) return dict(item) + def _snapshot_latest_persisted() -> dict[str, Any] | None: + rows_ = store.query( + "SELECT sequence, latest_json, board_json, weights_json, latest_etag, " + "board_etag, weights_etag FROM sat_snapshots " + "ORDER BY created_at_iso DESC, sequence DESC LIMIT 1" + ) + if not rows_: + return None + row = rows_[0] + sequence = str(row["sequence"]) + item = { + "latest_bytes": str(row["latest_json"]).encode("utf-8"), + "board_bytes": str(row["board_json"]).encode("utf-8"), + "weights_bytes": str(row["weights_json"]).encode("utf-8"), + "latest_etag": row["latest_etag"], + "board_etag": row["board_etag"], + "weights_etag": row["weights_etag"], + } + with snapshot_lock: + _snapshot_cache_local(sequence, item) + return dict(item) + + def _sat_latest_snapshot(*, allow_publish: bool) -> tuple[bytes, str, str] | None: + persisted = _snapshot_latest_persisted() + if persisted is not None: + try: + latest_obj = json.loads(persisted["latest_bytes"].decode("utf-8")) + sequence = str(latest_obj.get("sequence") or "") + except Exception: + sequence = "" + return persisted["latest_bytes"], str(persisted["latest_etag"]), sequence + if not allow_publish: + return None + latest, _board, _weights, etag = _sat_snapshot_bundle() + return _snapshot_bytes(latest), etag, str(latest["sequence"]) + def _sign_latest_pointer(payload: dict[str, Any]) -> dict[str, Any]: signed = dict(payload) signed["key_id"] = os.environ.get( @@ -590,7 +627,7 @@ def _sat_snapshot_bundle() -> tuple[dict[str, Any], dict[str, Any], dict[str, An "created_at": created_at, "publisher_generation_id": os.environ.get( "CATHEDRAL_PUBLISHER_GENERATION_ID", "default"), - "storage": "in_process_recent_snapshot_ring", + "storage": "shared_db_recent_snapshot_cache", "trust_root": "signed_latest_pointer_and_artifact_hashes", "artifacts": { "board": { @@ -624,7 +661,7 @@ def _sat_snapshot_bundle() -> tuple[dict[str, Any], dict[str, Any], dict[str, An "compatibility": { "legacy_read_endpoints_kept": True, "legacy_validator_weights_kept": True, - "historical_sequence_storage": "not_yet_published", + "historical_sequence_storage": "bounded_shared_db_cache", }, } signed_pointer = _sign_latest_pointer(pointer) @@ -2110,13 +2147,16 @@ async def challenge_broadcast(request: Request): @app.get("/sat/latest.json") async def sat_latest(request: Request): - latest, _board, _weights, etag = _sat_snapshot_bundle() - headers = _sat_snapshot_headers(etag, str(latest["sequence"])) + latest_snapshot = _sat_latest_snapshot(allow_publish=service_role != "read") + if latest_snapshot is None: + raise HTTPException(503, {"detail": "sat_snapshot_not_published"}) + latest_bytes, etag, sequence = latest_snapshot + headers = _sat_snapshot_headers(etag, sequence) inm = request.headers.get("if-none-match") if inm and etag in [t.strip() for t in inm.split(",")]: return Response(status_code=304, headers=headers) return Response( - content=_snapshot_bytes(latest), + content=latest_bytes, media_type="application/json", headers=headers, ) @@ -2125,8 +2165,13 @@ async def sat_latest(request: Request): async def sat_sequence_board(sequence: str, request: Request): snapshot = _snapshot_lookup(sequence) if snapshot is None: - latest, _board_payload, _weights, _latest_etag = _sat_snapshot_bundle() - current_sequence = str(latest["sequence"]) + latest_snapshot = _sat_latest_snapshot(allow_publish=service_role != "read") + current_sequence = "" + if latest_snapshot is not None: + try: + current_sequence = str(json.loads(latest_snapshot[0].decode("utf-8")).get("sequence") or "") + except Exception: + current_sequence = "" raise HTTPException( 404, { @@ -2150,8 +2195,13 @@ async def sat_sequence_board(sequence: str, request: Request): async def sat_sequence_weights(sequence: str, request: Request): snapshot = _snapshot_lookup(sequence) if snapshot is None: - latest, _board, _weights_payload, _latest_etag = _sat_snapshot_bundle() - current_sequence = str(latest["sequence"]) + latest_snapshot = _sat_latest_snapshot(allow_publish=service_role != "read") + current_sequence = "" + if latest_snapshot is not None: + try: + current_sequence = str(json.loads(latest_snapshot[0].decode("utf-8")).get("sequence") or "") + except Exception: + current_sequence = "" raise HTTPException( 404, { @@ -2174,6 +2224,8 @@ async def sat_sequence_weights(sequence: str, request: Request): @app.get("/sat/events") async def sat_events(request: Request, once: bool = Query(False)): heartbeat_secs = max(1.0, _env_float("CATHEDRAL_SSE_HEARTBEAT_SECS", 30.0)) + if service_role == "read" and _snapshot_latest_persisted() is None: + raise HTTPException(503, {"detail": "sat_snapshot_not_published"}) def _snapshot_event_frame(latest: dict[str, Any]) -> str: event = { @@ -2194,7 +2246,14 @@ def _snapshot_event_frame(latest: dict[str, Any]) -> str: async def _stream(): last_sequence = request.headers.get("last-event-id", "") while True: - latest, _board, _weights, _latest_etag = _sat_snapshot_bundle() + latest_snapshot = _sat_latest_snapshot(allow_publish=service_role != "read") + if latest_snapshot is None: + yield ": snapshot_not_published\n\n" + if once or await request.is_disconnected(): + return + await asyncio.sleep(heartbeat_secs) + continue + latest = json.loads(latest_snapshot[0].decode("utf-8")) sequence = str(latest["sequence"]) if sequence != last_sequence: yield _snapshot_event_frame(latest)