diff --git a/Cargo.lock b/Cargo.lock index f266255a..24f74383 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1088,6 +1088,21 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" +[[package]] +name = "ledger-daemon" +version = "0.0.1" +dependencies = [ + "anyhow", + "chrono", + "reqwest", + "serde", + "serde_json", + "tokio", + "tokio-postgres", + "tracing", + "tracing-subscriber", +] + [[package]] name = "libc" version = "0.2.186" diff --git a/Cargo.toml b/Cargo.toml index 527b7ab8..5e05b102 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ members = [ "crates/trios-igla-race", "bin/tri-railway", "bin/seed-agent", + "bin/ledger-daemon", ] [workspace.package] diff --git a/bin/ledger-daemon/Cargo.toml b/bin/ledger-daemon/Cargo.toml new file mode 100644 index 00000000..1317ec0d --- /dev/null +++ b/bin/ledger-daemon/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "ledger-daemon" +version.workspace = true +edition.workspace = true +license.workspace = true +authors.workspace = true +repository.workspace = true +description = "Scarabaeus Engine watchdog (Khepri-3): autoscaler, dead-worker resurrector, leak gate, anomaly alerts. Reads experiment_queue / workers / bpb_samples; writes gardener_runs and Railway redeploy calls." + +[[bin]] +name = "ledger-daemon" +path = "src/main.rs" + +[dependencies] +anyhow = "1.0" +tokio = { version = "1.40", features = ["full"] } +tokio-postgres = { version = "0.7", features = ["with-chrono-0_4"] } +chrono = { version = "0.4", features = ["serde"] } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } +reqwest = { version = "0.12", features = ["json", "rustls-tls"], default-features = false } diff --git a/bin/ledger-daemon/README.md b/bin/ledger-daemon/README.md new file mode 100644 index 00000000..8f9afa87 --- /dev/null +++ b/bin/ledger-daemon/README.md @@ -0,0 +1,41 @@ +# ledger-daemon — Scarabaeus Engine watchdog (Khepri-3) + +> "Хепри катит шар через Дуат. Скарабей не просит указаний — он просто катит." + +This crate is the **glaze of Khepri** — the eye that sees the whole fleet from +above and corrects course. It runs as a single Railway service (one replica), +ticks every 30 s, and performs four jobs: + +1. **Dead-worker resurrector** — workers whose `last_heartbeat` exceeds + `WORKER_DEAD_AFTER_SECS` (default 120 s) are redeployed via Railway API. +2. **Autoscaler** — computes desired replicas from `(queue_depth, target_throughput)` + and tells the seed-agent service group to grow/shrink (min=2, max=12). +3. **Leak gate** — every `experiment_queue` row that lands `done` with `bpb < 0.1` + is marked `last_error='SCARABAEUS-LEAK-CANDIDATE'` and ignored by ratification + pools until a held-out evaluator (Khepri-4) clears it. +4. **Anomaly alerts** — Telegram webhook on circuit-breaker events (worker + crash > 5/h, queue stuck > 30 min, BPB regression > 0.1 across 3 ticks). + +## Configuration + +| Env var | Default | Notes | +|---|---|---| +| `NEON_DATABASE_URL` | required | pooler URL, session mode | +| `RAILWAY_API_TOKEN` | required | for redeploy calls | +| `WORKER_DEAD_AFTER_SECS` | 120 | tune to network jitter | +| `LEDGER_TICK_SECS` | 30 | tick cadence | +| `LEDGER_MAX_REPLICAS` | 12 | autoscaler ceiling | +| `LEDGER_MIN_REPLICAS` | 2 | autoscaler floor | +| `LEDGER_LEAK_BPB_THRESHOLD` | 0.1 | flag-as-leak below this | +| `LEDGER_TELEGRAM_TOKEN` | optional | omit to skip alerts | +| `LEDGER_TELEGRAM_CHAT` | optional | | + +## Standing rules + +- **R1**: Rust only. +- **R5**: never overclaim. Every action emits a `gardener_runs` audit row. +- **R7**: triplet on every audit emit (`RAIL= @ project=… service=… …`). +- **R9**: destructive actions (delete service) require `confirm: true` flag + passed via env `LEDGER_DESTRUCTIVE_OK=1` — defaults off. + +phi^2 + phi^-2 = 3 · TRINITY · NEVER STOP. diff --git a/bin/ledger-daemon/src/audit.rs b/bin/ledger-daemon/src/audit.rs new file mode 100644 index 00000000..e4d7f2b5 --- /dev/null +++ b/bin/ledger-daemon/src/audit.rs @@ -0,0 +1,42 @@ +//! R7 triplet audit helper — every ledger-daemon mutation emits one row to +//! `gardener_runs`: +//! +//! action TEXT e.g. 'resurrect', 'scale_up', 'leak_flag', 'zap' +//! lane TEXT service name or canon prefix +//! seed INT (nullable) +//! before_bpb REAL (nullable) for leak_flag only +//! after_bpb REAL (nullable) +//! decision JSONB {"reason":..., "target":..., "dry_run":...} +//! +//! All timestamps are server-side `now()`. + +use anyhow::{Context, Result}; +use serde_json::Value as Json; + +pub async fn emit( + client: &tokio_postgres::Client, + action: &str, + lane: &str, + seed: Option, + before_bpb: Option, + after_bpb: Option, + decision: Json, +) -> Result<()> { + client + .execute( + "INSERT INTO gardener_runs (ts, action, lane, seed, before_bpb, after_bpb, decision) \ + VALUES (now(), $1, $2, $3, $4, $5, $6::jsonb)", + &[ + &action, + &lane, + &seed, + &before_bpb, + &after_bpb, + &decision.to_string(), + ], + ) + .await + .with_context(|| format!("gardener_runs emit (action={action} lane={lane})"))?; + tracing::info!(action, lane, ?seed, ?after_bpb, %decision, "🪲 audit"); + Ok(()) +} diff --git a/bin/ledger-daemon/src/config.rs b/bin/ledger-daemon/src/config.rs new file mode 100644 index 00000000..8a8ffd7c --- /dev/null +++ b/bin/ledger-daemon/src/config.rs @@ -0,0 +1,51 @@ +//! Config parsing from env vars. R5-honest: required vars fail loud at startup. + +use anyhow::{Context, Result}; + +#[derive(Debug, Clone)] +#[allow(dead_code)] // PR-1 scaffold; fields wired in PR-2..PR-5. +pub struct Config { + pub neon_dsn: String, + pub railway_api_token: Option, + pub tick_secs: u64, + pub worker_dead_after_secs: i64, + pub max_replicas: u32, + pub min_replicas: u32, + pub leak_bpb_threshold: f64, + pub stuck_running_after_hours: i64, + pub telegram_token: Option, + pub telegram_chat: Option, + pub destructive_ok: bool, +} + +impl Config { + pub fn from_env() -> Result { + let neon_dsn = std::env::var("NEON_DATABASE_URL") + .or_else(|_| std::env::var("TRIOS_NEON_DSN")) + .or_else(|_| std::env::var("DATABASE_URL")) + .context("NEON_DATABASE_URL (or TRIOS_NEON_DSN / DATABASE_URL) required")?; + + Ok(Self { + neon_dsn, + railway_api_token: std::env::var("RAILWAY_API_TOKEN").ok(), + tick_secs: env_or("LEDGER_TICK_SECS", 30), + worker_dead_after_secs: env_or("WORKER_DEAD_AFTER_SECS", 120), + max_replicas: env_or("LEDGER_MAX_REPLICAS", 12), + min_replicas: env_or("LEDGER_MIN_REPLICAS", 2), + leak_bpb_threshold: env_or("LEDGER_LEAK_BPB_THRESHOLD", 0.1_f64), + stuck_running_after_hours: env_or("LEDGER_STUCK_HOURS", 1), + telegram_token: std::env::var("LEDGER_TELEGRAM_TOKEN").ok(), + telegram_chat: std::env::var("LEDGER_TELEGRAM_CHAT").ok(), + destructive_ok: std::env::var("LEDGER_DESTRUCTIVE_OK") + .map(|v| v == "1" || v.eq_ignore_ascii_case("true")) + .unwrap_or(false), + }) + } +} + +fn env_or(key: &str, default: T) -> T { + std::env::var(key) + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(default) +} diff --git a/bin/ledger-daemon/src/jobs.rs b/bin/ledger-daemon/src/jobs.rs new file mode 100644 index 00000000..7249e068 --- /dev/null +++ b/bin/ledger-daemon/src/jobs.rs @@ -0,0 +1,205 @@ +//! Four jobs of the Scarabaeus watchdog. +//! +//! PR-1 (this file) is the SCAFFOLD. Each function performs the SELECT / COUNT +//! query and emits an audit row with `decision.dry_run=true`. The actual +//! mutations (Railway redeploy, requeue UPDATE) are landed in PR-2..PR-5. + +use anyhow::Result; +use serde_json::json; + +use crate::audit; +use crate::config::Config; + +/// Job 1 — dead-worker resurrector. +/// +/// SQL: +/// ```sql +/// SELECT id, railway_acc, railway_svc_name, EXTRACT(EPOCH FROM now() - last_heartbeat)::int AS age_s +/// FROM workers +/// WHERE last_heartbeat < now() - ($1 || ' seconds')::interval +/// ``` +/// +/// Mutation (PR-2): Railway GraphQL `serviceInstanceRedeploy` with scope=account. +pub async fn resurrect_dead_workers(client: &tokio_postgres::Client, cfg: &Config) -> Result<()> { + let rows = client + .query( + "SELECT id::text, railway_acc, railway_svc_name, \ + EXTRACT(EPOCH FROM now() - last_heartbeat)::int8 AS age_s \ + FROM workers \ + WHERE last_heartbeat < now() - ($1::text || ' seconds')::interval \ + ORDER BY last_heartbeat ASC \ + LIMIT 100", + &[&cfg.worker_dead_after_secs.to_string()], + ) + .await?; + + for row in &rows { + let id: String = row.get(0); + let acc: Option = row.try_get(1).ok(); + let svc: Option = row.try_get(2).ok(); + let age_s: i64 = row.get(3); + + audit::emit( + client, + "resurrect_candidate", + svc.as_deref().unwrap_or("unknown"), + None, + None, + None, + json!({ + "worker_id": id, + "account": acc, + "heartbeat_age_s": age_s, + "threshold_s": cfg.worker_dead_after_secs, + "dry_run": true, + "todo_pr_2": "Railway serviceInstanceRedeploy", + }), + ) + .await?; + } + + if !rows.is_empty() { + tracing::warn!(count = rows.len(), "dead workers detected (dry-run)"); + } + Ok(()) +} + +/// Job 2 — autoscaler. +/// +/// SQL: +/// ```sql +/// SELECT count(*) FROM experiment_queue +/// WHERE status='pending' AND scheduled_at <= now() +/// ``` +/// +/// Decision (PR-3): `desired = clamp(queue_depth / 10, MIN_REPLICAS, MAX_REPLICAS)` +/// and patch Railway service replicas accordingly. +pub async fn autoscale(client: &tokio_postgres::Client, cfg: &Config) -> Result<()> { + let row = client + .query_one( + "SELECT count(*) FROM experiment_queue \ + WHERE status='pending' AND scheduled_at <= now()", + &[], + ) + .await?; + let depth: i64 = row.get(0); + let desired = (depth / 10) + .max(cfg.min_replicas as i64) + .min(cfg.max_replicas as i64) as u32; + + audit::emit( + client, + "autoscale_compute", + "seed-agent-pool", + None, + None, + None, + json!({ + "queue_depth": depth, + "desired_replicas": desired, + "min": cfg.min_replicas, + "max": cfg.max_replicas, + "dry_run": true, + "todo_pr_3": "Railway service replicas patch", + }), + ) + .await?; + Ok(()) +} + +/// Job 3 — leak gate. +/// +/// SQL: +/// ```sql +/// UPDATE experiment_queue SET last_error = 'SCARABAEUS-LEAK-CANDIDATE: bpb Result<()> { + let rows = client + .query( + "UPDATE experiment_queue \ + SET last_error = 'SCARABAEUS-LEAK-CANDIDATE: bpb<' || $1 \ + WHERE status='done' \ + AND final_bpb IS NOT NULL \ + AND final_bpb < $1 \ + AND (last_error IS NULL OR last_error NOT LIKE 'SCARABAEUS-%') \ + RETURNING id, canon_name, final_bpb", + &[&cfg.leak_bpb_threshold], + ) + .await?; + + for row in &rows { + let id: i64 = row.get(0); + let canon: String = row.get(1); + let bpb: f64 = row.get(2); + audit::emit( + client, + "leak_flag", + &canon, + None, + Some(bpb), + Some(bpb), + json!({ + "queue_id": id, + "threshold": cfg.leak_bpb_threshold, + "reason": "bpb below subfloor; requires heldout eval (Khepri-4)", + }), + ) + .await?; + } + + if !rows.is_empty() { + tracing::warn!(count = rows.len(), "leak candidates flagged"); + } + Ok(()) +} + +/// Job 4 — stuck-running zapper. +/// +/// Requeues rows that have been `status='running'` longer than +/// `stuck_running_after_hours` via Khepri-2 semantics (retry with backoff). +pub async fn zap_stuck_running(client: &tokio_postgres::Client, cfg: &Config) -> Result<()> { + let rows = client + .query( + "UPDATE experiment_queue \ + SET status = CASE WHEN attempts + 1 >= max_attempts THEN 'dead' ELSE 'pending' END, \ + attempts = attempts + 1, \ + last_error = 'SCARABAEUS-ZAP-STUCK: running>' || $1 || 'h', \ + scheduled_at = now() + (power(2, attempts) * interval '1 minute'), \ + worker_id = NULL, \ + claimed_at = NULL, \ + started_at = NULL, \ + finished_at = CASE WHEN attempts + 1 >= max_attempts THEN now() ELSE NULL END \ + WHERE status='running' \ + AND started_at < now() - ($1::text || ' hours')::interval \ + RETURNING id, canon_name, attempts", + &[&cfg.stuck_running_after_hours.to_string()], + ) + .await?; + + for row in &rows { + let id: i64 = row.get(0); + let canon: String = row.get(1); + let attempts: i32 = row.get(2); + audit::emit( + client, + "zap_stuck", + &canon, + None, + None, + None, + json!({ + "queue_id": id, + "attempts_after": attempts, + "threshold_hours": cfg.stuck_running_after_hours, + }), + ) + .await?; + } + + if !rows.is_empty() { + tracing::warn!(count = rows.len(), "stuck running jobs zapped"); + } + Ok(()) +} diff --git a/bin/ledger-daemon/src/main.rs b/bin/ledger-daemon/src/main.rs new file mode 100644 index 00000000..4ef01446 --- /dev/null +++ b/bin/ledger-daemon/src/main.rs @@ -0,0 +1,104 @@ +//! ledger-daemon — Scarabaeus Engine watchdog (Khepri-3). +//! +//! One tick every `LEDGER_TICK_SECS` (default 30 s). Four jobs per tick: +//! +//! 1. Dead-worker resurrector: `workers.last_heartbeat` older than +//! `WORKER_DEAD_AFTER_SECS` → Railway redeploy. +//! 2. Autoscaler: `queue_depth = COUNT(experiment_queue WHERE status='pending' +//! AND scheduled_at <= now())` → target N replicas. +//! 3. Leak gate: `experiment_queue` rows landing `done` with `bpb < 0.1` +//! get `last_error='SCARABAEUS-LEAK-CANDIDATE'`. +//! 4. Stuck-job zapper: rows `status='running' AND started_at < +//! now() - interval '1 hour'` are requeued via Khepri-2 semantics. +//! +//! All mutations emit one audit row into `gardener_runs` with the R7 triplet. +//! +//! **STATUS: PR-1 scaffold**. Each job is a TODO stub with the exact SQL it +//! will run — wires up postgres client, tick loop, graceful shutdown, and +//! audit emit helper. The four jobs are implemented in PR-2..PR-5, one per +//! sub-issue of [trios-railway#101](https://github.com/gHashTag/trios-railway/issues/101). +//! +//! Anchor: `phi^2 + phi^-2 = 3`. + +use std::time::Duration; + +use anyhow::{Context, Result}; +use tokio::signal; +use tokio::time::{interval, MissedTickBehavior}; +use tokio_postgres::NoTls; + +mod audit; +mod config; +mod jobs; + +use crate::config::Config; + +#[tokio::main] +async fn main() -> Result<()> { + tracing_subscriber::fmt() + .with_env_filter( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), + ) + .init(); + + let cfg = Config::from_env().context("load config")?; + tracing::info!(?cfg, "🪲 ledger-daemon starting (Khepri-3 watchdog)"); + + // Connect to Neon. Session-mode pooler recommended for long-lived + // connections; transaction-mode will work but we re-connect on error. + let (client, connection) = tokio_postgres::connect(&cfg.neon_dsn, NoTls) + .await + .context("neon connect")?; + tokio::spawn(async move { + if let Err(e) = connection.await { + tracing::error!(?e, "neon connection task died"); + } + }); + tracing::info!("neon connection OK"); + + let mut tick = interval(Duration::from_secs(cfg.tick_secs)); + tick.set_missed_tick_behavior(MissedTickBehavior::Skip); + + loop { + tokio::select! { + _ = tick.tick() => { + if let Err(e) = tick_once(&client, &cfg).await { + tracing::warn!(?e, "tick failed (non-fatal, will retry next tick)"); + } + } + _ = signal::ctrl_c() => { + tracing::info!("shutdown signal received; exiting cleanly"); + break; + } + } + } + + Ok(()) +} + +async fn tick_once(client: &tokio_postgres::Client, cfg: &Config) -> Result<()> { + // Each job returns a Result<()>; we collect errors but never bail the + // whole tick — one failing job must not starve the other three. + let mut errors: Vec = Vec::new(); + + if let Err(e) = jobs::resurrect_dead_workers(client, cfg).await { + errors.push(format!("resurrect: {e:#}")); + } + if let Err(e) = jobs::autoscale(client, cfg).await { + errors.push(format!("autoscale: {e:#}")); + } + if let Err(e) = jobs::leak_gate(client, cfg).await { + errors.push(format!("leak_gate: {e:#}")); + } + if let Err(e) = jobs::zap_stuck_running(client, cfg).await { + errors.push(format!("zap_stuck: {e:#}")); + } + + if errors.is_empty() { + tracing::info!("tick ok"); + } else { + tracing::warn!(?errors, "tick completed with job errors"); + } + Ok(()) +}