Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions deploy/DISTRIBUTION_ARCHITECTURE_PLAN.md
Original file line number Diff line number Diff line change
Expand Up @@ -703,3 +703,25 @@ Acceptance:

This slice moves Cathedral away from hot database reads without changing SAT
scoring or miner submit semantics.

## Current Compatibility Slice

The first mergeable slice keeps existing miner and validator URLs working while
adding the scalable read contract:

- `GET /sat/latest.json` returns a signed latest pointer with sequence, hashes,
sizes, and artifact URLs.
- `GET /sat/sequences/{sequence}/board.json` returns the current board snapshot
for a recent sequence persisted in the shared publisher database.
- `GET /sat/sequences/{sequence}/weights.json` returns the current signed
weights snapshot for a recent sequence persisted in the shared publisher
database.
- `GET /sat/events` streams SSE hints containing only sequence and latest URL.
- `GET /v1/agents/receipts/{receipt_id}` returns durable submit status without
scraping recent activity.

This is deliberately a transition layer: it does not yet publish historical
immutable object storage, CNF artifacts, transactional outbox rows, or async
verification workers. Sequence artifact responses serve byte-exact,
hash-verifiable JSON from a bounded shared-database cache, but they use bounded
cache headers rather than claiming permanent origin availability.
198 changes: 197 additions & 1 deletion publisher_verify.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,145 @@ def ck(name: str, cond: bool) -> None:
headers={"If-None-Match": bc.headers.get("etag", "")},
)
ck("challenge-broadcast supports ETag 304", bc_304.status_code == 304)
latest = client.get("/sat/latest.json")
latest_json = latest.json()
latest_etag = latest.headers.get("etag", "")
latest_seq = str(latest_json.get("sequence"))
ck("sat latest pointer exposes signed snapshot artifacts",
latest.status_code == 200
and latest_json.get("schema") == "cathedral.sat.latest.v1"
and latest_json.get("signature")
and latest_json.get("artifacts", {}).get("board", {}).get("url")
== f"/sat/sequences/{latest_seq}/board.json"
and latest_json.get("artifacts", {}).get("weights", {}).get("url")
== f"/sat/sequences/{latest_seq}/weights.json"
and latest_json.get("artifacts", {}).get("board", {}).get("hash", "").startswith("sha256:"))
latest_304 = client.get("/sat/latest.json", headers={"If-None-Match": latest_etag})
ck("sat latest pointer supports ETag 304", latest_304.status_code == 304)
sat_board = client.get(f"/sat/sequences/{latest_seq}/board.json")
ck("sat board snapshot matches active-challenges",
sat_board.status_code == 200
and sat_board.json()["items"][0]["challenge_id"] == ac["items"][0]["challenge_id"]
and sat_board.headers.get("x-cathedral-sequence") == latest_seq
and sat_board.headers.get("etag") == latest_json["artifacts"]["board"]["etag"])
ck("sat board snapshot bytes match signed pointer hash and size",
"sha256:" + hashlib.sha256(sat_board.content).hexdigest()
== latest_json["artifacts"]["board"]["hash"]
and len(sat_board.content) == latest_json["artifacts"]["board"]["size_bytes"])
sat_weights = client.get(f"/sat/sequences/{latest_seq}/weights.json")
ck("sat weights snapshot preserves signed vector shape",
sat_weights.status_code == 200
and sat_weights.json().get("signature")
and isinstance(sat_weights.json().get("weights"), list)
and sat_weights.headers.get("x-cathedral-sequence") == latest_seq
and sat_weights.headers.get("etag") == latest_json["artifacts"]["weights"]["etag"])
ck("sat weights snapshot bytes match signed pointer hash and size",
"sha256:" + hashlib.sha256(sat_weights.content).hexdigest()
== latest_json["artifacts"]["weights"]["hash"]
and len(sat_weights.content) == latest_json["artifacts"]["weights"]["size_bytes"])
ck("sat sequence artifacts do not claim durable immutable origin storage yet",
"immutable" not in (sat_board.headers.get("cache-control") or "").lower()
and "31536000" not in (sat_board.headers.get("cache-control") or ""))
stale_snapshot = client.get("/sat/sequences/stale-test-sequence/board.json")
ck("sat stale sequence fails closed", stale_snapshot.status_code == 404)
sat_events = client.get("/sat/events", params={"once": "true"})
ck("sat events emits latest pointer hint only",
sat_events.status_code == 200
and "event: cathedral.sat.snapshot" in sat_events.text
and '"latest_url":"/sat/latest.json"' in sat_events.text
and '"sequence":"' in sat_events.text)
import tempfile as _snapshot_tempfile
snapshot_db_path = ""
try:
with _snapshot_tempfile.NamedTemporaryFile(suffix=".sqlite", delete=False) as _snapshot_db:
snapshot_db_path = _snapshot_db.name
snapshot_app_a = build_app(
database_path=snapshot_db_path,
signing_key_hex=key_hex,
submit_min_interval_secs=0,
)
seed_challenge(
snapshot_app_a.state.store,
challenge_id="sat-snapshot-cross-app",
tier=1,
cnf_text=cnf_e2e,
)
snapshot_app_b = build_app(
database_path=snapshot_db_path,
signing_key_hex=key_hex,
submit_min_interval_secs=0,
)
with TestClient(snapshot_app_a) as snapshot_client_a, TestClient(snapshot_app_b) as snapshot_client_b:
cross_latest = snapshot_client_a.get("/sat/latest.json")
cross_latest_json = cross_latest.json()
cross_seq = str(cross_latest_json.get("sequence"))
cross_board = snapshot_client_b.get(
cross_latest_json["artifacts"]["board"]["url"])
cross_weights = snapshot_client_b.get(
cross_latest_json["artifacts"]["weights"]["url"])
ck("sat sequence artifacts resolve across split read replicas",
cross_latest.status_code == 200
and cross_board.status_code == 200
and cross_weights.status_code == 200
and cross_board.headers.get("x-cathedral-sequence") == cross_seq
and cross_weights.headers.get("x-cathedral-sequence") == cross_seq)
ck("cross-replica sat artifact bytes match signed pointer hashes",
"sha256:" + hashlib.sha256(cross_board.content).hexdigest()
== cross_latest_json["artifacts"]["board"]["hash"]
and "sha256:" + hashlib.sha256(cross_weights.content).hexdigest()
== cross_latest_json["artifacts"]["weights"]["hash"])

snapshot_fail_app_a = build_app(
database_path=snapshot_db_path,
signing_key_hex=key_hex,
submit_min_interval_secs=0,
)
snapshot_fail_app_b = build_app(
database_path=snapshot_db_path,
signing_key_hex=key_hex,
submit_min_interval_secs=0,
)
snapshot_fail_app_a.state.store.write(
lambda conn: conn.execute("DELETE FROM sat_snapshots"))
original_write = snapshot_fail_app_a.state.store.write
failed_snapshot_write = {"done": False}

class _FailOneSnapshotInsert:
def __init__(self, conn):
self._conn = conn

def execute(self, sql, *args, **kwargs):
if (
not failed_snapshot_write["done"]
and "INSERT OR REPLACE INTO sat_snapshots" in str(sql)
):
failed_snapshot_write["done"] = True
raise RuntimeError("forced_sat_snapshot_write_failure")
return self._conn.execute(sql, *args, **kwargs)

def __getattr__(self, name):
return getattr(self._conn, name)

def _flaky_write(fn):
return original_write(lambda conn: fn(_FailOneSnapshotInsert(conn)))

snapshot_fail_app_a.state.store.write = _flaky_write
with TestClient(snapshot_fail_app_a, raise_server_exceptions=False) as fail_client_a, TestClient(snapshot_fail_app_b) as fail_client_b:
failed_latest = fail_client_a.get("/sat/latest.json")
retry_latest = fail_client_a.get("/sat/latest.json")
retry_latest_json = retry_latest.json()
retry_board = fail_client_b.get(retry_latest_json["artifacts"]["board"]["url"])
ck("sat snapshot write failure does not poison local cache",
failed_latest.status_code == 500
and retry_latest.status_code == 200
and retry_board.status_code == 200
and failed_snapshot_write["done"])
finally:
try:
if snapshot_db_path:
os.unlink(snapshot_db_path)
except Exception:
pass
old_rpm = os.environ.get("CATHEDRAL_RATELIMIT_RPM")
os.environ["CATHEDRAL_RATELIMIT_RPM"] = "1"
try:
Expand All @@ -274,10 +413,31 @@ def ck(name: str, cond: bool) -> None:
"CATHEDRAL_REFILL_ENABLED": os.environ.get("CATHEDRAL_REFILL_ENABLED"),
}
try:
role_db_path = ""
with _snapshot_tempfile.NamedTemporaryFile(suffix=".sqlite", delete=False) as _role_db:
role_db_path = _role_db.name
os.environ["CATHEDRAL_SERVICE_ROLE"] = "all"
role_seed_app = build_app(database_path=role_db_path, signing_key_hex=key_hex,
submit_min_interval_secs=0)
seed_challenge(
role_seed_app.state.store,
challenge_id="sat-snapshot-role-seed",
tier=1,
cnf_text=cnf_e2e,
)
with TestClient(role_seed_app) as role_seed_client:
role_seed_latest = role_seed_client.get("/sat/latest.json")
ck("role test prepublishes SAT snapshot",
role_seed_latest.status_code == 200)

os.environ["CATHEDRAL_SERVICE_ROLE"] = "read"
os.environ["CATHEDRAL_REFILL_ENABLED"] = "true"
role_app = build_app(database_path=":memory:", signing_key_hex=key_hex,
role_app = build_app(database_path=role_db_path, signing_key_hex=key_hex,
submit_min_interval_secs=0)
before_snapshots = role_app.state.store.query(
"SELECT COUNT(*) AS count FROM sat_snapshots")[0]["count"]
before_weight_policy = role_app.state.store.query(
"SELECT COUNT(*) AS count FROM weight_policy_state")[0]["count"]
with TestClient(role_app) as role_client:
role_health = role_client.get("/health/live").json()
ck("service role appears in health", role_health["service_role"] == "read")
Expand All @@ -288,6 +448,21 @@ def ck(name: str, cond: bool) -> None:
read_submit.status_code == 404
and read_submit.text == "route_not_served_by_read_role"
and read_submit.headers.get("x-cathedral-service-role") == "read")
read_latest = role_client.get("/sat/latest.json")
read_seq = str(read_latest.json().get("sequence")) if read_latest.status_code == 200 else ""
read_snapshot = role_client.get(f"/sat/sequences/{read_seq}/board.json")
read_events = role_client.get("/sat/events", params={"once": "true"})
ck("read role serves scalable SAT snapshot endpoints",
read_latest.status_code == 200
and read_snapshot.status_code == 200
and read_events.status_code == 200)
after_snapshots = role_app.state.store.query(
"SELECT COUNT(*) AS count FROM sat_snapshots")[0]["count"]
after_weight_policy = role_app.state.store.query(
"SELECT COUNT(*) AS count FROM weight_policy_state")[0]["count"]
ck("read role serves SAT snapshots without publishing",
before_snapshots == after_snapshots
and before_weight_policy == after_weight_policy)

os.environ["CATHEDRAL_SERVICE_ROLE"] = "submit"
os.environ["CATHEDRAL_REFILL_ENABLED"] = "false"
Expand All @@ -305,6 +480,10 @@ def ck(name: str, cond: bool) -> None:
submit_cnf = submit_role_client.get("/v1/synthetic-boolean/active-cnf")
ck("submit role allows miner CNF route to reach auth validation",
submit_cnf.status_code == 422)
submit_receipt = submit_role_client.get("/v1/agents/receipts/not-real")
ck("submit role serves receipt status route",
submit_receipt.status_code == 404
and submit_receipt.headers.get("x-cathedral-service-role") is None)

os.environ["CATHEDRAL_SERVICE_ROLE"] = "worker"
worker_role_app = build_app(database_path=":memory:", signing_key_hex=key_hex,
Expand Down Expand Up @@ -395,6 +574,11 @@ def _seed_retention_rows(conn):
os.environ.pop(_key, None)
else:
os.environ[_key] = _value
try:
if "role_db_path" in locals() and role_db_path:
os.unlink(role_db_path)
except Exception:
pass

from bittensor_wallet import Keypair # noqa
miner = Keypair.create_from_uri("//E2EMiner")
Expand Down Expand Up @@ -730,6 +914,18 @@ def _offer_sig(body: dict, *, when: str):

ck("first solve gets open-window rank 1", resp.json().get("solve_rank") == 1)
ck("submit row emits base weighted_score 1.0", resp.json().get("weighted_score") == 1.0)
receipt = client.get(f"/v1/agents/receipts/{resp.json().get('id')}")
receipt_json = receipt.json() if receipt.status_code == 200 else {}
ck("submit receipt endpoint returns durable accepted status",
receipt.status_code == 200
and receipt_json.get("schema") == "cathedral.submit_receipt.v1"
and receipt_json.get("challenge_id") == "sat-e2e-1"
and receipt_json.get("miner_hotkey") == miner.ss58_address
and receipt_json.get("status") == "ranked"
and receipt.headers.get("cache-control") == "no-store")
missing_receipt = client.get("/v1/agents/receipts/not-a-real-receipt")
ck("submit receipt endpoint fails closed for missing IDs",
missing_receipt.status_code == 404)
explain = client.get(
"/v1/leaderboard/explain",
params={"miner_hotkey": miner.ss58_address},
Expand Down
Loading
Loading