Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ members = [
"crates/trios-igla-race",
"bin/tri-railway",
"bin/seed-agent",
"bin/ledger-daemon",
]

[workspace.package]
Expand Down
23 changes: 23 additions & 0 deletions bin/ledger-daemon/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[package]
name = "ledger-daemon"
version.workspace = true
edition.workspace = true
license.workspace = true
authors.workspace = true
repository.workspace = true
description = "Scarabaeus Engine watchdog (Khepri-3): autoscaler, dead-worker resurrector, leak gate, anomaly alerts. Reads experiment_queue / workers / bpb_samples; writes gardener_runs and Railway redeploy calls."

[[bin]]
name = "ledger-daemon"
path = "src/main.rs"

[dependencies]
anyhow = "1.0"
tokio = { version = "1.40", features = ["full"] }
tokio-postgres = { version = "0.7", features = ["with-chrono-0_4"] }
chrono = { version = "0.4", features = ["serde"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
reqwest = { version = "0.12", features = ["json", "rustls-tls"], default-features = false }
41 changes: 41 additions & 0 deletions bin/ledger-daemon/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# ledger-daemon — Scarabaeus Engine watchdog (Khepri-3)

> "Хепри катит шар через Дуат. Скарабей не просит указаний — он просто катит."

This crate is the **glaze of Khepri** — the eye that sees the whole fleet from
above and corrects course. It runs as a single Railway service (one replica),
ticks every 30 s, and performs four jobs:

1. **Dead-worker resurrector** — workers whose `last_heartbeat` exceeds
`WORKER_DEAD_AFTER_SECS` (default 120 s) are redeployed via Railway API.
2. **Autoscaler** — computes desired replicas from `(queue_depth, target_throughput)`
and tells the seed-agent service group to grow/shrink (min=2, max=12).
3. **Leak gate** — every `experiment_queue` row that lands `done` with `bpb < 0.1`
is marked `last_error='SCARABAEUS-LEAK-CANDIDATE'` and ignored by ratification
pools until a held-out evaluator (Khepri-4) clears it.
4. **Anomaly alerts** — Telegram webhook on circuit-breaker events (worker
crash > 5/h, queue stuck > 30 min, BPB regression > 0.1 across 3 ticks).

## Configuration

| Env var | Default | Notes |
|---|---|---|
| `NEON_DATABASE_URL` | required | pooler URL, session mode |
| `RAILWAY_API_TOKEN` | required | for redeploy calls |
| `WORKER_DEAD_AFTER_SECS` | 120 | tune to network jitter |
| `LEDGER_TICK_SECS` | 30 | tick cadence |
| `LEDGER_MAX_REPLICAS` | 12 | autoscaler ceiling |
| `LEDGER_MIN_REPLICAS` | 2 | autoscaler floor |
| `LEDGER_LEAK_BPB_THRESHOLD` | 0.1 | flag-as-leak below this |
| `LEDGER_TELEGRAM_TOKEN` | optional | omit to skip alerts |
| `LEDGER_TELEGRAM_CHAT` | optional | |

## Standing rules

- **R1**: Rust only.
- **R5**: never overclaim. Every action emits a `gardener_runs` audit row.
- **R7**: triplet on every audit emit (`RAIL=<verb> @ project=… service=… …`).
- **R9**: destructive actions (delete service) require `confirm: true` flag
passed via env `LEDGER_DESTRUCTIVE_OK=1` — defaults off.

phi^2 + phi^-2 = 3 · TRINITY · NEVER STOP.
42 changes: 42 additions & 0 deletions bin/ledger-daemon/src/audit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
//! R7 triplet audit helper — every ledger-daemon mutation emits one row to
//! `gardener_runs`:
//!
//! action TEXT e.g. 'resurrect', 'scale_up', 'leak_flag', 'zap'
//! lane TEXT service name or canon prefix
//! seed INT (nullable)
//! before_bpb REAL (nullable) for leak_flag only
//! after_bpb REAL (nullable)
//! decision JSONB {"reason":..., "target":..., "dry_run":...}
//!
//! All timestamps are server-side `now()`.

use anyhow::{Context, Result};
use serde_json::Value as Json;

pub async fn emit(
client: &tokio_postgres::Client,
action: &str,
lane: &str,
seed: Option<i32>,
before_bpb: Option<f64>,
after_bpb: Option<f64>,
decision: Json,
) -> Result<()> {
client
.execute(
"INSERT INTO gardener_runs (ts, action, lane, seed, before_bpb, after_bpb, decision) \
VALUES (now(), $1, $2, $3, $4, $5, $6::jsonb)",
&[
&action,
&lane,
&seed,
&before_bpb,
&after_bpb,
&decision.to_string(),
],
)
.await
.with_context(|| format!("gardener_runs emit (action={action} lane={lane})"))?;
tracing::info!(action, lane, ?seed, ?after_bpb, %decision, "🪲 audit");
Ok(())
}
51 changes: 51 additions & 0 deletions bin/ledger-daemon/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
//! Config parsing from env vars. R5-honest: required vars fail loud at startup.

use anyhow::{Context, Result};

#[derive(Debug, Clone)]
#[allow(dead_code)] // PR-1 scaffold; fields wired in PR-2..PR-5.
pub struct Config {
pub neon_dsn: String,
pub railway_api_token: Option<String>,
pub tick_secs: u64,
pub worker_dead_after_secs: i64,
pub max_replicas: u32,
pub min_replicas: u32,
pub leak_bpb_threshold: f64,
pub stuck_running_after_hours: i64,
pub telegram_token: Option<String>,
pub telegram_chat: Option<String>,
pub destructive_ok: bool,
}

impl Config {
pub fn from_env() -> Result<Self> {
let neon_dsn = std::env::var("NEON_DATABASE_URL")
.or_else(|_| std::env::var("TRIOS_NEON_DSN"))
.or_else(|_| std::env::var("DATABASE_URL"))
.context("NEON_DATABASE_URL (or TRIOS_NEON_DSN / DATABASE_URL) required")?;

Ok(Self {
neon_dsn,
railway_api_token: std::env::var("RAILWAY_API_TOKEN").ok(),
tick_secs: env_or("LEDGER_TICK_SECS", 30),
worker_dead_after_secs: env_or("WORKER_DEAD_AFTER_SECS", 120),
max_replicas: env_or("LEDGER_MAX_REPLICAS", 12),
min_replicas: env_or("LEDGER_MIN_REPLICAS", 2),
leak_bpb_threshold: env_or("LEDGER_LEAK_BPB_THRESHOLD", 0.1_f64),
stuck_running_after_hours: env_or("LEDGER_STUCK_HOURS", 1),
telegram_token: std::env::var("LEDGER_TELEGRAM_TOKEN").ok(),
telegram_chat: std::env::var("LEDGER_TELEGRAM_CHAT").ok(),
destructive_ok: std::env::var("LEDGER_DESTRUCTIVE_OK")
.map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
.unwrap_or(false),
})
}
}

fn env_or<T: std::str::FromStr>(key: &str, default: T) -> T {
std::env::var(key)
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(default)
}
205 changes: 205 additions & 0 deletions bin/ledger-daemon/src/jobs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
//! Four jobs of the Scarabaeus watchdog.
//!
//! PR-1 (this file) is the SCAFFOLD. Each function performs the SELECT / COUNT
//! query and emits an audit row with `decision.dry_run=true`. The actual
//! mutations (Railway redeploy, requeue UPDATE) are landed in PR-2..PR-5.

use anyhow::Result;
use serde_json::json;

use crate::audit;
use crate::config::Config;

/// Job 1 — dead-worker resurrector.
///
/// SQL:
/// ```sql
/// SELECT id, railway_acc, railway_svc_name, EXTRACT(EPOCH FROM now() - last_heartbeat)::int AS age_s
/// FROM workers
/// WHERE last_heartbeat < now() - ($1 || ' seconds')::interval
/// ```
///
/// Mutation (PR-2): Railway GraphQL `serviceInstanceRedeploy` with scope=account.
pub async fn resurrect_dead_workers(client: &tokio_postgres::Client, cfg: &Config) -> Result<()> {
let rows = client
.query(
"SELECT id::text, railway_acc, railway_svc_name, \
EXTRACT(EPOCH FROM now() - last_heartbeat)::int8 AS age_s \
FROM workers \
WHERE last_heartbeat < now() - ($1::text || ' seconds')::interval \
ORDER BY last_heartbeat ASC \
LIMIT 100",
&[&cfg.worker_dead_after_secs.to_string()],
)
.await?;

for row in &rows {
let id: String = row.get(0);
let acc: Option<String> = row.try_get(1).ok();
let svc: Option<String> = row.try_get(2).ok();
let age_s: i64 = row.get(3);

audit::emit(
client,
"resurrect_candidate",
svc.as_deref().unwrap_or("unknown"),
None,
None,
None,
json!({
"worker_id": id,
"account": acc,
"heartbeat_age_s": age_s,
"threshold_s": cfg.worker_dead_after_secs,
"dry_run": true,
"todo_pr_2": "Railway serviceInstanceRedeploy",
}),
)
.await?;
}

if !rows.is_empty() {
tracing::warn!(count = rows.len(), "dead workers detected (dry-run)");
}
Ok(())
}

/// Job 2 — autoscaler.
///
/// SQL:
/// ```sql
/// SELECT count(*) FROM experiment_queue
/// WHERE status='pending' AND scheduled_at <= now()
/// ```
///
/// Decision (PR-3): `desired = clamp(queue_depth / 10, MIN_REPLICAS, MAX_REPLICAS)`
/// and patch Railway service replicas accordingly.
pub async fn autoscale(client: &tokio_postgres::Client, cfg: &Config) -> Result<()> {
let row = client
.query_one(
"SELECT count(*) FROM experiment_queue \
WHERE status='pending' AND scheduled_at <= now()",
&[],
)
.await?;
let depth: i64 = row.get(0);
let desired = (depth / 10)
.max(cfg.min_replicas as i64)
.min(cfg.max_replicas as i64) as u32;

audit::emit(
client,
"autoscale_compute",
"seed-agent-pool",
None,
None,
None,
json!({
"queue_depth": depth,
"desired_replicas": desired,
"min": cfg.min_replicas,
"max": cfg.max_replicas,
"dry_run": true,
"todo_pr_3": "Railway service replicas patch",
}),
)
.await?;
Ok(())
}

/// Job 3 — leak gate.
///
/// SQL:
/// ```sql
/// UPDATE experiment_queue SET last_error = 'SCARABAEUS-LEAK-CANDIDATE: bpb<threshold'
/// WHERE status='done' AND final_bpb < $1 AND (last_error IS NULL OR last_error NOT LIKE 'SCARABAEUS-%')
/// RETURNING id, canon_name, final_bpb
/// ```
pub async fn leak_gate(client: &tokio_postgres::Client, cfg: &Config) -> Result<()> {
let rows = client
.query(
"UPDATE experiment_queue \
SET last_error = 'SCARABAEUS-LEAK-CANDIDATE: bpb<' || $1 \
WHERE status='done' \
AND final_bpb IS NOT NULL \
AND final_bpb < $1 \
AND (last_error IS NULL OR last_error NOT LIKE 'SCARABAEUS-%') \
RETURNING id, canon_name, final_bpb",
&[&cfg.leak_bpb_threshold],
)
.await?;

for row in &rows {
let id: i64 = row.get(0);
let canon: String = row.get(1);
let bpb: f64 = row.get(2);
audit::emit(
client,
"leak_flag",
&canon,
None,
Some(bpb),
Some(bpb),
json!({
"queue_id": id,
"threshold": cfg.leak_bpb_threshold,
"reason": "bpb below subfloor; requires heldout eval (Khepri-4)",
}),
)
.await?;
}

if !rows.is_empty() {
tracing::warn!(count = rows.len(), "leak candidates flagged");
}
Ok(())
}

/// Job 4 — stuck-running zapper.
///
/// Requeues rows that have been `status='running'` longer than
/// `stuck_running_after_hours` via Khepri-2 semantics (retry with backoff).
pub async fn zap_stuck_running(client: &tokio_postgres::Client, cfg: &Config) -> Result<()> {
let rows = client
.query(
"UPDATE experiment_queue \
SET status = CASE WHEN attempts + 1 >= max_attempts THEN 'dead' ELSE 'pending' END, \
attempts = attempts + 1, \
last_error = 'SCARABAEUS-ZAP-STUCK: running>' || $1 || 'h', \
scheduled_at = now() + (power(2, attempts) * interval '1 minute'), \
worker_id = NULL, \
claimed_at = NULL, \
started_at = NULL, \
finished_at = CASE WHEN attempts + 1 >= max_attempts THEN now() ELSE NULL END \
WHERE status='running' \
AND started_at < now() - ($1::text || ' hours')::interval \
RETURNING id, canon_name, attempts",
&[&cfg.stuck_running_after_hours.to_string()],
)
.await?;

for row in &rows {
let id: i64 = row.get(0);
let canon: String = row.get(1);
let attempts: i32 = row.get(2);
audit::emit(
client,
"zap_stuck",
&canon,
None,
None,
None,
json!({
"queue_id": id,
"attempts_after": attempts,
"threshold_hours": cfg.stuck_running_after_hours,
}),
)
.await?;
}

if !rows.is_empty() {
tracing::warn!(count = rows.len(), "stuck running jobs zapped");
}
Ok(())
}
Loading
Loading