diff --git a/docs/src/specialized/admin/server-deployment.md b/docs/src/specialized/admin/server-deployment.md index 55ad8282..78a7ea01 100644 --- a/docs/src/specialized/admin/server-deployment.md +++ b/docs/src/specialized/admin/server-deployment.md @@ -141,6 +141,9 @@ RUST_LOG=debug torc-server run --log-dir /var/log/torc `./torc-server-snapshot.db`). See [In-Memory Database with Snapshots](#in-memory-database-with-snapshots-advanced). - `TORC_SERVER_SNAPSHOT_KEEP`: Number of snapshots to retain (default `5`, minimum `1`) +- `TORC_API_EVENT_BODY_MAX_BYTES`: Per-direction cap on captured request/response bodies surfaced + through `torc admin tail-api` (default `8192`). See + [Live API Request Inspection](#live-api-request-inspection). Example: @@ -637,6 +640,117 @@ For Docker/Kubernetes deployments, call `torc admin reload-auth` after updating instead of restarting the container. See [Hot-Reloading Credentials](./authentication.md#hot-reloading-credentials) for details. +## Live API Request Inspection + +`torc admin tail-api` streams a structured event for every inbound HTTP request the server +processes, delivered over Server-Sent Events. It is intended for debugging traffic against a running +server without tailing log files: events are network-accessible, structured, and emitted in real +time. + +```bash +# Watch all API requests as they arrive +torc admin tail-api + +# Include captured request and response bodies (subject to size caps) +torc admin tail-api --include-bodies + +# Stream JSON, one event per line, e.g. for piping to jq +torc -f json admin tail-api | jq . +``` + +Each event includes the HTTP method, path, query string, status code, latency in milliseconds, the +`x-span-id` assigned by the server, and the authenticated user (when one was resolved). Output +fields: + +| Field | Description | +| --------------- | ------------------------------------------------------------------------ | +| `timestamp_ms` | Unix epoch milliseconds at which the request finished. | +| `method` | HTTP method (`GET`, `POST`, …). | +| `path` | URL path (without query string). | +| `query` | URL query string, when present. | +| `status` | Final HTTP status code returned to the client. | +| `latency_ms` | Wall-clock duration spent inside the router. | +| `request_id` | Server-assigned `x-span-id` for cross-referencing log lines. | +| `user` | Authenticated subject, or advisory client user (see below). | +| `request_body` | Captured request body, only when `--include-bodies` is set (see below). | +| `response_body` | Captured response body, only when `--include-bodies` is set (see below). | + +### Body capture + +Body capture is **opt-in**. When `--include-bodies` is set, the server captures up to **8 KiB** per +direction and includes truncated UTF-8 text in each event. Override the per-direction display cap +with `TORC_API_EVENT_BODY_MAX_BYTES`. + +Independent of the display cap, the server enforces a **1 MiB hard ceiling** on bytes it will buffer +in memory and only captures bodies whose size is advertised up front (via `Content-Length` or the +body's size hint). Larger payloads, chunked uploads with no advertised length, and SSE response +streams (e.g. `/workflows/{id}/events/stream`) are passed through untouched and emitted with no +body. Binary payloads are still recorded — `bytes` reflects the total length — but the `text` field +is omitted so consumers can detect non-UTF-8 content. + +`Authorization` and `Cookie` headers are never captured under any setting. + +### How the `user` field is populated + +When the server is running with `--auth-file`, the `user` field contains the authenticated subject +resolved from Basic Auth. When the server is running **without** `--auth-file`, every request would +otherwise resolve to `anonymous`, which is not very useful for debugging. To improve that, the +`torc` CLI sends an advisory `X-Torc-Client-User` header on every API call, sourced from +`TORC_USERNAME` / `USER` / `USERNAME`. The capture middleware uses this header **only as a +fallback** when no real authentication was resolved. + +The advisory header is **trivially spoofable** by any client and is never used for authorization or +workflow-ownership decisions — only for labeling events in this stream. If you need a trustworthy +`user` field, run the server with `--auth-file` and `--require-auth`. + +### Performance + +When no admin client is connected to the stream, the capture middleware short-circuits — no bodies +are buffered and no event is constructed, so the runtime cost on the request hot path is negligible. +Subscribing one or more admins begins event construction; subscribing _with_ `--include-bodies` is +what triggers body buffering. + +### Endpoint + +The underlying endpoint is `GET /torc-service/v1/admin/api-events/stream`. It accepts a single +optional query parameter, `include_bodies=true`. Like other admin endpoints (e.g. +`/admin/reload-auth`), it requires standard server authentication but no additional admin role — +restrict access via your htpasswd file or upstream proxy. + +## Server Load Stats + +For an at-a-glance view of how busy the server is — without streaming individual requests — use +`torc admin api-stats`. The server keeps a 1-hour ring of per-second counters; the CLI fetches an +aggregated snapshot and renders it as a table. + +```bash +# Last hour, in 1-minute buckets (default) +torc admin api-stats + +# Last 5 minutes, in 30-second buckets +torc admin api-stats --window 300 --interval 30 + +# Raw JSON for scripts or jq +torc -f json admin api-stats +``` + +Each row reports request count, requests-per-second, bytes in / bytes out, and a 2xx / 4xx / 5xx +status breakdown for that interval. A trailing `Total:` line sums the entire window. + +### Counters + +The capture middleware records every request — independent of whether anyone is connected to +`tail-api`, so the ring is always up to date. Bytes are read from the `Content-Length` request and +response headers; chunked or streaming responses (notably the SSE event streams themselves) do not +advertise a length and contribute `0` bytes, although the request itself is still counted. Stats are +in-memory only and reset on server restart. + +### Endpoint + +The underlying endpoint is `GET /torc-service/v1/admin/api-stats`, with optional `window_seconds` +(default `3600`, capped at the ring size) and `interval_seconds` (default `60`) query parameters. +Buckets are returned newest-first. + ## Log Rotation Strategy The server uses automatic size-based rotation with the following defaults: diff --git a/src/client/apis/configuration.rs b/src/client/apis/configuration.rs index 69393845..b34b7f0f 100644 --- a/src/client/apis/configuration.rs +++ b/src/client/apis/configuration.rs @@ -210,10 +210,31 @@ impl Configuration { }; req_builder = req_builder.header("X-API-Key", value); } + // Advisory client identity, used by the server's API event stream + // when the request is not otherwise authenticated. Trivially + // spoofable; never used for authorization. + if let Some(value) = client_user_header_value() { + req_builder = req_builder.header(CLIENT_USER_HEADER, value); + } req_builder } } +/// Header that carries the advisory client OS username for unauthenticated +/// requests. Inspected by the server's `tail-api` event stream and ignored +/// by every other code path. +pub const CLIENT_USER_HEADER: &str = "X-Torc-Client-User"; + +/// Resolve the value to send in [`CLIENT_USER_HEADER`], skipping empty or +/// non-ASCII values that reqwest would reject. +pub fn client_user_header_value() -> Option { + let user = crate::get_username(); + if user.is_empty() || !user.is_ascii() { + return None; + } + Some(user) +} + impl Default for Configuration { fn default() -> Self { Configuration { diff --git a/src/client/commands/admin.rs b/src/client/commands/admin.rs index 21042b17..8b9c3bd4 100644 --- a/src/client/commands/admin.rs +++ b/src/client/commands/admin.rs @@ -1,7 +1,13 @@ +use std::io::{BufRead, BufReader}; + +use chrono::{DateTime, Local, Utc}; use clap::Subcommand; +use serde::{Deserialize, Serialize}; use crate::client::apis; -use crate::client::apis::configuration::Configuration; +use crate::client::apis::configuration::{ + CLIENT_USER_HEADER, Configuration, client_user_header_value, +}; use crate::client::commands::print_error; #[derive(Subcommand)] @@ -19,6 +25,51 @@ EXAMPLES: " )] ReloadAuth, + + /// Tail inbound API requests via Server-Sent Events + #[command( + name = "tail-api", + after_long_help = "\ +EXAMPLES: + # Watch all API requests as they arrive + torc admin tail-api + + # Include request and response bodies (subject to size limits) + torc admin tail-api --include-bodies + + # Emit one JSON object per line, e.g. for piping to jq + torc -f json admin tail-api +" + )] + TailApi { + /// Stream captured request and response bodies along with metadata + #[arg(long)] + include_bodies: bool, + }, + + /// Show recent API request rate, throughput, and status mix + #[command( + name = "api-stats", + after_long_help = "\ +EXAMPLES: + # Last hour, in 1-minute buckets + torc admin api-stats + + # Last 5 minutes, in 30-second buckets + torc admin api-stats --window 300 --interval 30 + + # Raw JSON, e.g. for scripts or jq + torc -f json admin api-stats +" + )] + ApiStats { + /// Total span to report on, in seconds. Defaults to 3600 (1 hour). + #[arg(long, default_value_t = 3600)] + window: u64, + /// Aggregation bucket width, in seconds. Defaults to 60. + #[arg(long, default_value_t = 60)] + interval: u64, + }, } pub fn handle_admin_commands(config: &Configuration, command: &AdminCommands, format: &str) { @@ -36,5 +87,320 @@ pub fn handle_admin_commands(config: &Configuration, command: &AdminCommands, fo std::process::exit(1); } }, + AdminCommands::TailApi { include_bodies } => { + tail_api_events(config, *include_bodies, format); + } + AdminCommands::ApiStats { window, interval } => { + show_api_stats(config, *window, *interval, format); + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct CapturedBody { + bytes: usize, + truncated: bool, + #[serde(default)] + text: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct ApiRequestEvent { + timestamp_ms: i64, + method: String, + path: String, + #[serde(default)] + query: Option, + status: u16, + latency_ms: u64, + #[serde(default)] + request_id: Option, + #[serde(default)] + user: Option, + #[serde(default)] + request_body: Option, + #[serde(default)] + response_body: Option, +} + +fn tail_api_events(config: &Configuration, include_bodies: bool, format: &str) { + let mut url = format!("{}/admin/api-events/stream", config.base_path); + if include_bodies { + url.push_str("?include_bodies=true"); + } + + eprintln!("Connecting to {} ...", url); + if include_bodies { + eprintln!("Body capture enabled (truncated to server-configured limit)."); + } + eprintln!("Press Ctrl+C to stop.\n"); + + let mut builder = reqwest::blocking::Client::builder().timeout(None); + if let Some(ref cookie) = config.cookie_header { + let mut headers = reqwest::header::HeaderMap::new(); + match reqwest::header::HeaderValue::from_str(cookie) { + Ok(v) => { + headers.insert(reqwest::header::COOKIE, v); + builder = builder.default_headers(headers); + } + Err(e) => { + eprintln!("Invalid cookie header: {}", e); + std::process::exit(1); + } + } + } + let client = match config.tls.configure_blocking_builder(builder).build() { + Ok(c) => c, + Err(e) => { + eprintln!("Failed to build HTTP client: {}", e); + std::process::exit(1); + } + }; + + let mut request = client.get(&url).header("Accept", "text/event-stream"); + if let Some(value) = client_user_header_value() { + request = request.header(CLIENT_USER_HEADER, value); + } + if let Some((ref username, ref password)) = config.basic_auth { + request = request.basic_auth(username.clone(), password.clone()); + } else if let Some(ref token) = config.bearer_access_token { + request = request.bearer_auth(token.clone()); + } else if let Some(ref api_key) = config.api_key { + let value = match api_key.prefix { + Some(ref prefix) => format!("{} {}", prefix, api_key.key), + None => api_key.key.clone(), + }; + request = request.header("X-API-KEY", value); + } + + let response = match request.send() { + Ok(r) => r, + Err(e) => { + eprintln!("Failed to connect to admin event stream: {}", e); + std::process::exit(1); + } + }; + if !response.status().is_success() { + eprintln!("Server returned error status: {}", response.status()); + std::process::exit(1); + } + + let mut reader = BufReader::new(response); + let mut event_type = String::new(); + let mut data = String::new(); + loop { + let mut line = String::new(); + match reader.read_line(&mut line) { + Ok(0) => { + eprintln!("\nServer closed the stream."); + break; + } + Ok(_) => {} + Err(e) => { + eprintln!("\nError reading stream: {}", e); + break; + } + } + + let trimmed = line.trim_end(); + if trimmed.is_empty() { + if !data.is_empty() { + handle_frame(&event_type, &data, format); + } + event_type.clear(); + data.clear(); + continue; + } + if let Some(value) = trimmed.strip_prefix("event: ") { + event_type = value.to_string(); + } else if let Some(value) = trimmed.strip_prefix("data: ") { + if !data.is_empty() { + data.push('\n'); + } + data.push_str(value); + } + } +} + +fn handle_frame(event_type: &str, data: &str, format: &str) { + if event_type == "warning" { + eprintln!("warning: {}", data); + return; + } + match serde_json::from_str::(data) { + Ok(event) => print_event(&event, format), + Err(e) => eprintln!("Failed to parse event: {} - data: {}", e, data), + } +} + +fn print_event(event: &ApiRequestEvent, format: &str) { + if format == "json" { + if let Ok(line) = serde_json::to_string(event) { + println!("{}", line); + } + return; + } + + let ts = format_ms(event.timestamp_ms); + let user = event.user.as_deref().unwrap_or("-"); + let span = event.request_id.as_deref().unwrap_or("-"); + let path = match &event.query { + Some(q) if !q.is_empty() => format!("{}?{}", event.path, q), + _ => event.path.clone(), + }; + println!( + "[{ts}] {status:>3} {method:<6} {path} ({latency}ms, user={user}, span={span})", + status = event.status, + method = event.method, + latency = event.latency_ms, + ); + if let Some(body) = &event.request_body { + print_body(" request ", body); + } + if let Some(body) = &event.response_body { + print_body(" response ", body); + } +} + +fn print_body(label: &str, body: &CapturedBody) { + let suffix = if body.truncated { " [truncated]" } else { "" }; + match &body.text { + Some(text) => { + let preview = text.replace('\n', " "); + println!( + "{label}{} bytes{suffix}: {}", + body.bytes, + preview.trim_end() + ); + } + None => println!("{label}{} bytes (binary){suffix}", body.bytes), + } +} + +fn format_ms(ts: i64) -> String { + DateTime::from_timestamp_millis(ts) + .map(|dt: DateTime| { + dt.with_timezone(&Local) + .format("%Y-%m-%d %H:%M:%S%.3f") + .to_string() + }) + .unwrap_or_else(|| format!("{}ms", ts)) +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct ApiStatsBucket { + start_ms: i64, + request_count: u64, + bytes_in: u64, + bytes_out: u64, + status_2xx: u64, + status_4xx: u64, + status_5xx: u64, + #[serde(default)] + status_other: u64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct ApiStatsSnapshot { + now_ms: i64, + interval_seconds: u64, + window_seconds: u64, + buckets: Vec, +} + +fn show_api_stats(config: &Configuration, window: u64, interval: u64, format: &str) { + let url = format!( + "{}/admin/api-stats?window_seconds={window}&interval_seconds={interval}", + config.base_path + ); + + let request = config.client.get(&url).header("Accept", "application/json"); + let request = config.apply_auth(request); + + let snapshot: ApiStatsSnapshot = match request.send() { + Ok(resp) if resp.status().is_success() => match resp.json() { + Ok(s) => s, + Err(e) => { + eprintln!("Failed to parse api-stats response: {}", e); + std::process::exit(1); + } + }, + Ok(resp) => { + eprintln!("Server returned error status: {}", resp.status()); + std::process::exit(1); + } + Err(e) => { + eprintln!("Failed to fetch api-stats: {}", e); + std::process::exit(1); + } + }; + + if format == "json" { + match serde_json::to_string_pretty(&snapshot) { + Ok(s) => println!("{}", s), + Err(e) => eprintln!("Failed to serialize: {}", e), + } + return; + } + + print_stats_table(&snapshot); +} + +fn print_stats_table(snap: &ApiStatsSnapshot) { + let interval = snap.interval_seconds.max(1); + let mut total_requests = 0u64; + let mut total_bytes_in = 0u64; + let mut total_bytes_out = 0u64; + + println!( + "Window: last {}s in {}s buckets (newest first)\n", + snap.window_seconds, snap.interval_seconds + ); + println!( + "{:<22} {:>8} {:>8} {:>10} {:>10} {:>6} {:>6} {:>6}", + "bucket start", "req", "req/s", "in", "out", "2xx", "4xx", "5xx" + ); + println!("{}", "-".repeat(82)); + + for b in &snap.buckets { + let req_per_s = b.request_count as f64 / interval as f64; + total_requests += b.request_count; + total_bytes_in += b.bytes_in; + total_bytes_out += b.bytes_out; + println!( + "{:<22} {:>8} {:>8.2} {:>10} {:>10} {:>6} {:>6} {:>6}", + format_ms(b.start_ms), + b.request_count, + req_per_s, + humanize_bytes(b.bytes_in), + humanize_bytes(b.bytes_out), + b.status_2xx, + b.status_4xx, + b.status_5xx, + ); + } + + println!("{}", "-".repeat(82)); + println!( + "Total: {} requests, {} in, {} out over {}s", + total_requests, + humanize_bytes(total_bytes_in), + humanize_bytes(total_bytes_out), + snap.window_seconds + ); +} + +fn humanize_bytes(bytes: u64) -> String { + const UNITS: &[&str] = &["B", "KiB", "MiB", "GiB", "TiB"]; + let mut value = bytes as f64; + let mut unit = 0; + while value >= 1024.0 && unit + 1 < UNITS.len() { + value /= 1024.0; + unit += 1; + } + if unit == 0 { + format!("{} {}", bytes, UNITS[0]) + } else { + format!("{:.1} {}", value, UNITS[unit]) } } diff --git a/src/client/sse_client.rs b/src/client/sse_client.rs index 2d8aca88..c689dcbb 100644 --- a/src/client/sse_client.rs +++ b/src/client/sse_client.rs @@ -3,7 +3,9 @@ //! This module provides a client for connecting to the SSE endpoint and //! receiving real-time job events from the server. -use crate::client::apis::configuration::Configuration; +use crate::client::apis::configuration::{ + CLIENT_USER_HEADER, Configuration, client_user_header_value, +}; use crate::models::EventSeverity; use serde::{Deserialize, Serialize}; use std::io::{BufRead, BufReader}; @@ -101,6 +103,9 @@ impl SseConnection { // Build request and apply authentication from Configuration let mut request = client.get(&url).header("Accept", "text/event-stream"); + if let Some(value) = client_user_header_value() { + request = request.header(CLIENT_USER_HEADER, value); + } // Apply basic authentication if configured if let Some((ref username, ref password)) = config.basic_auth { diff --git a/src/server.rs b/src/server.rs index b60169c0..63599798 100644 --- a/src/server.rs +++ b/src/server.rs @@ -6,7 +6,9 @@ pub mod api; pub mod api_constants; pub mod api_contract; +pub mod api_event_stream; pub mod api_responses; +pub mod api_stats; pub mod auth; pub mod authorization; pub mod context; diff --git a/src/server/api_event_stream.rs b/src/server/api_event_stream.rs new file mode 100644 index 00000000..859dcdbd --- /dev/null +++ b/src/server/api_event_stream.rs @@ -0,0 +1,240 @@ +//! Broadcast channel and event types for the admin API request inspector. +//! +//! Each HTTP request that flows through the live router emits an +//! [`ApiRequestEvent`] to subscribers of the admin SSE stream. When no +//! subscribers are connected, sends are essentially a no-op so the runtime +//! cost is limited to a clock read and a `Sender::send` on a channel with +//! zero receivers. + +use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; +use tokio::sync::broadcast; + +/// Default cap on captured request/response body bytes per direction +/// that are forwarded to subscribers. +pub const DEFAULT_BODY_CAPTURE_LIMIT: usize = 8 * 1024; + +/// Environment variable that overrides [`DEFAULT_BODY_CAPTURE_LIMIT`]. +pub const BODY_CAPTURE_LIMIT_ENV: &str = "TORC_API_EVENT_BODY_MAX_BYTES"; + +/// Hard ceiling on bytes the middleware will buffer in memory in order +/// to capture a body. Requests/responses whose advertised length +/// exceeds this are passed through untouched (no body capture). +pub const BODY_CAPTURE_HARD_CAP_BYTES: usize = 1024 * 1024; + +/// Resolve the per-direction body display limit at runtime. +pub fn body_capture_limit() -> usize { + std::env::var(BODY_CAPTURE_LIMIT_ENV) + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(DEFAULT_BODY_CAPTURE_LIMIT) +} + +/// A single captured HTTP request/response, broadcast to admin SSE clients. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ApiRequestEvent { + /// Unix epoch milliseconds at which the request finished. + pub timestamp_ms: i64, + /// HTTP method (e.g. `GET`, `POST`). + pub method: String, + /// URL path component (without query string). + pub path: String, + /// URL query string, if any (without the leading `?`). + #[serde(skip_serializing_if = "Option::is_none")] + pub query: Option, + /// Final HTTP status code returned to the client. + pub status: u16, + /// Wall-clock duration spent inside the router, in milliseconds. + pub latency_ms: u64, + /// `x-span-id` assigned by `inject_request_context`, when available. + #[serde(skip_serializing_if = "Option::is_none")] + pub request_id: Option, + /// Authenticated subject extracted from the request, when available. + #[serde(skip_serializing_if = "Option::is_none")] + pub user: Option, + /// Captured request body when body capture was enabled and the + /// payload was textual. + #[serde(skip_serializing_if = "Option::is_none")] + pub request_body: Option, + /// Captured response body when body capture was enabled and the + /// payload was textual. + #[serde(skip_serializing_if = "Option::is_none")] + pub response_body: Option, +} + +/// Captured payload, possibly truncated. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CapturedBody { + /// Total observed length in bytes (before truncation). + pub bytes: usize, + /// Whether `text` was truncated to fit the capture limit. + pub truncated: bool, + /// UTF-8 view of the (possibly truncated) body. `None` when the + /// body was not valid UTF-8 — binary payloads are reported as + /// metadata only. + #[serde(skip_serializing_if = "Option::is_none")] + pub text: Option, +} + +impl CapturedBody { + /// Build a [`CapturedBody`] from raw bytes, truncating to `limit`. + pub fn from_bytes(bytes: &[u8], limit: usize) -> Self { + let total = bytes.len(); + let truncated = total > limit; + let slice = if truncated { &bytes[..limit] } else { bytes }; + let text = std::str::from_utf8(slice).ok().map(|s| s.to_string()); + Self { + bytes: total, + truncated, + text, + } + } +} + +/// Broadcast channel for [`ApiRequestEvent`]s. +#[derive(Clone)] +pub struct ApiEventBroadcaster { + sender: Arc>, + body_subscribers: Arc, +} + +impl ApiEventBroadcaster { + /// Create a broadcaster with the given channel capacity. + pub fn new(capacity: usize) -> Self { + let (sender, _) = broadcast::channel(capacity); + Self { + sender: Arc::new(sender), + body_subscribers: Arc::new(AtomicUsize::new(0)), + } + } + + /// Returns the number of currently connected receivers. + pub fn receiver_count(&self) -> usize { + self.sender.receiver_count() + } + + /// Returns the number of receivers that asked for body capture. + pub fn body_subscriber_count(&self) -> usize { + self.body_subscribers.load(Ordering::Relaxed) + } + + /// Broadcast an event. Returns `true` if at least one receiver was + /// notified. Drops silently when there are no subscribers. + pub fn broadcast(&self, event: ApiRequestEvent) -> bool { + self.sender.send(event).is_ok() + } + + /// Subscribe to the channel. + pub fn subscribe(&self) -> broadcast::Receiver { + self.sender.subscribe() + } + + /// Register interest in body capture; the returned guard decrements + /// the body-subscriber count when dropped. + pub fn body_subscriber_guard(&self) -> BodySubscriberGuard { + self.body_subscribers.fetch_add(1, Ordering::Relaxed); + BodySubscriberGuard { + counter: self.body_subscribers.clone(), + } + } +} + +impl Default for ApiEventBroadcaster { + fn default() -> Self { + Self::new(256) + } +} + +/// RAII guard that keeps the broadcaster's body-subscriber count +/// elevated for as long as it is held. +pub struct BodySubscriberGuard { + counter: Arc, +} + +impl Drop for BodySubscriberGuard { + fn drop(&mut self) { + self.counter.fetch_sub(1, Ordering::Relaxed); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn sample_event() -> ApiRequestEvent { + ApiRequestEvent { + timestamp_ms: 1_700_000_000_000, + method: "GET".into(), + path: "/torc-service/v1/ping".into(), + query: None, + status: 200, + latency_ms: 4, + request_id: Some("span-1".into()), + user: Some("alice".into()), + request_body: None, + response_body: None, + } + } + + #[tokio::test] + async fn broadcast_delivers_to_subscribers() { + let bus = ApiEventBroadcaster::new(8); + let mut rx = bus.subscribe(); + assert_eq!(bus.receiver_count(), 1); + + assert!(bus.broadcast(sample_event())); + let received = rx.recv().await.expect("event"); + assert_eq!(received.path, "/torc-service/v1/ping"); + assert_eq!(received.status, 200); + } + + #[tokio::test] + async fn broadcast_with_no_subscribers_is_noop() { + let bus = ApiEventBroadcaster::new(8); + assert_eq!(bus.receiver_count(), 0); + assert!(!bus.broadcast(sample_event())); + } + + #[test] + fn body_subscriber_guard_tracks_count() { + let bus = ApiEventBroadcaster::new(8); + assert_eq!(bus.body_subscriber_count(), 0); + + let guard1 = bus.body_subscriber_guard(); + let guard2 = bus.body_subscriber_guard(); + assert_eq!(bus.body_subscriber_count(), 2); + + drop(guard1); + assert_eq!(bus.body_subscriber_count(), 1); + drop(guard2); + assert_eq!(bus.body_subscriber_count(), 0); + } + + #[test] + fn captured_body_truncates() { + let body = b"hello world".as_slice(); + let captured = CapturedBody::from_bytes(body, 5); + assert_eq!(captured.bytes, 11); + assert!(captured.truncated); + assert_eq!(captured.text.as_deref(), Some("hello")); + } + + #[test] + fn captured_body_full() { + let body = b"abc".as_slice(); + let captured = CapturedBody::from_bytes(body, 16); + assert_eq!(captured.bytes, 3); + assert!(!captured.truncated); + assert_eq!(captured.text.as_deref(), Some("abc")); + } + + #[test] + fn captured_body_binary_drops_text() { + let body = &[0xff, 0xfe, 0xfd][..]; + let captured = CapturedBody::from_bytes(body, 16); + assert_eq!(captured.bytes, 3); + assert!(!captured.truncated); + assert!(captured.text.is_none()); + } +} diff --git a/src/server/api_stats.rs b/src/server/api_stats.rs new file mode 100644 index 00000000..acd54667 --- /dev/null +++ b/src/server/api_stats.rs @@ -0,0 +1,256 @@ +//! Lightweight per-second ring buffer of HTTP request stats. +//! +//! Tracks request count, request/response bytes (from `Content-Length` +//! headers — see caveats below), and a 2xx/4xx/5xx breakdown for the +//! last hour. Used by the `GET /admin/api-stats` endpoint and the +//! `torc admin api-stats` CLI to answer "how busy is the server right +//! now?" without streaming individual events. +//! +//! ## Caveats +//! +//! * Byte counts come from `Content-Length` request and response headers +//! only. Chunked / streaming responses (notably the SSE event streams) +//! do not advertise a length and will be reported as 0 bytes; the +//! request *count* is still recorded. +//! * Stats are in-memory only. The buffer is cleared on server restart. + +use parking_lot::Mutex; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; + +/// Number of 1-second buckets retained. One hour of history. +pub const BUCKET_COUNT: usize = 3600; + +/// Default window the API endpoint reports when none is requested. +pub const DEFAULT_WINDOW_SECONDS: u64 = 3600; + +/// Default aggregation interval for the API endpoint. +pub const DEFAULT_INTERVAL_SECONDS: u64 = 60; + +/// One second's worth of accumulated request stats. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct ApiStatsBucket { + /// Bucket start, in Unix epoch milliseconds. + pub start_ms: i64, + /// Total requests handled during this bucket. + pub request_count: u64, + /// Sum of `Content-Length` from inbound requests. + pub bytes_in: u64, + /// Sum of `Content-Length` from outbound responses. + pub bytes_out: u64, + /// Requests that returned a 2xx status. + pub status_2xx: u64, + /// Requests that returned a 4xx status. + pub status_4xx: u64, + /// Requests that returned a 5xx status. + pub status_5xx: u64, + /// Requests that returned anything else (1xx, 3xx). + pub status_other: u64, +} + +impl ApiStatsBucket { + fn merge(&mut self, other: &ApiStatsBucket) { + self.request_count += other.request_count; + self.bytes_in += other.bytes_in; + self.bytes_out += other.bytes_out; + self.status_2xx += other.status_2xx; + self.status_4xx += other.status_4xx; + self.status_5xx += other.status_5xx; + self.status_other += other.status_other; + } +} + +/// Snapshot returned by [`ApiStatsRing::snapshot`]. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ApiStatsSnapshot { + /// Server-side current time in Unix epoch milliseconds. + pub now_ms: i64, + /// Width of each bucket in seconds. + pub interval_seconds: u64, + /// Total span covered by `buckets`, in seconds. + pub window_seconds: u64, + /// Newest first: `buckets[0]` is the most recent interval. + pub buckets: Vec, +} + +/// Mutex-protected ring of per-second counters. +#[derive(Clone)] +pub struct ApiStatsRing { + inner: Arc>, +} + +struct Inner { + /// One bucket per second, indexed by `(unix_seconds % BUCKET_COUNT)`. + /// `start_ms = 0` means "never written". + buckets: Box<[ApiStatsBucket]>, +} + +impl ApiStatsRing { + pub fn new() -> Self { + let buckets = (0..BUCKET_COUNT) + .map(|_| ApiStatsBucket::default()) + .collect::>() + .into_boxed_slice(); + Self { + inner: Arc::new(Mutex::new(Inner { buckets })), + } + } + + /// Record a single completed request. + pub fn record(&self, now_ms: i64, status: u16, bytes_in: u64, bytes_out: u64) { + if now_ms <= 0 { + return; + } + let bucket_start_ms = (now_ms / 1000) * 1000; + let idx = ((now_ms / 1000) as i128).rem_euclid(BUCKET_COUNT as i128) as usize; + let mut guard = self.inner.lock(); + let bucket = &mut guard.buckets[idx]; + if bucket.start_ms != bucket_start_ms { + *bucket = ApiStatsBucket { + start_ms: bucket_start_ms, + ..ApiStatsBucket::default() + }; + } + bucket.request_count += 1; + bucket.bytes_in += bytes_in; + bucket.bytes_out += bytes_out; + match status / 100 { + 2 => bucket.status_2xx += 1, + 4 => bucket.status_4xx += 1, + 5 => bucket.status_5xx += 1, + _ => bucket.status_other += 1, + } + } + + /// Aggregate the last `window_seconds` of recorded data into + /// `interval_seconds`-wide buckets, newest first. + pub fn snapshot( + &self, + now_ms: i64, + window_seconds: u64, + interval_seconds: u64, + ) -> ApiStatsSnapshot { + let interval_seconds = interval_seconds.max(1); + let window_seconds = window_seconds + .max(interval_seconds) + .min(BUCKET_COUNT as u64); + let bucket_count = window_seconds.div_ceil(interval_seconds); + + let now_secs = now_ms / 1000; + // Anchor each output bucket to the floor of `now` aligned to + // `interval_seconds` so successive snapshots report consistent + // boundaries. + let latest_start_secs = (now_secs / interval_seconds as i64) * interval_seconds as i64; + + let mut output = Vec::with_capacity(bucket_count as usize); + let guard = self.inner.lock(); + for i in 0..bucket_count { + let start_secs = latest_start_secs - (i as i64) * (interval_seconds as i64); + let mut agg = ApiStatsBucket { + start_ms: start_secs * 1000, + ..ApiStatsBucket::default() + }; + for offset in 0..interval_seconds as i64 { + let sec = start_secs + offset; + let idx = (sec as i128).rem_euclid(BUCKET_COUNT as i128) as usize; + let stored = &guard.buckets[idx]; + if stored.start_ms / 1000 == sec { + agg.merge(stored); + } + } + output.push(agg); + } + ApiStatsSnapshot { + now_ms, + interval_seconds, + window_seconds: bucket_count * interval_seconds, + buckets: output, + } + } +} + +impl Default for ApiStatsRing { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn record_and_snapshot_single_bucket() { + let ring = ApiStatsRing::new(); + let t = 1_700_000_000_000; + ring.record(t, 200, 100, 200); + ring.record(t + 100, 200, 50, 75); + ring.record(t + 200, 404, 0, 30); + ring.record(t + 300, 500, 0, 10); + + let snap = ring.snapshot(t + 500, 60, 60); + assert_eq!(snap.buckets.len(), 1); + let bucket = &snap.buckets[0]; + assert_eq!(bucket.request_count, 4); + assert_eq!(bucket.bytes_in, 150); + assert_eq!(bucket.bytes_out, 315); + assert_eq!(bucket.status_2xx, 2); + assert_eq!(bucket.status_4xx, 1); + assert_eq!(bucket.status_5xx, 1); + assert_eq!(bucket.status_other, 0); + } + + #[test] + fn snapshot_aggregates_across_seconds() { + let ring = ApiStatsRing::new(); + let t = 1_700_000_000_000; + // Two requests in second 0 + ring.record(t, 200, 1, 2); + ring.record(t + 100, 200, 3, 4); + // One request 30 seconds later + ring.record(t + 30_000, 200, 10, 20); + + // 60-second bucket should include all three + let snap = ring.snapshot(t + 35_000, 60, 60); + assert_eq!(snap.buckets[0].request_count, 3); + assert_eq!(snap.buckets[0].bytes_in, 14); + assert_eq!(snap.buckets[0].bytes_out, 26); + } + + #[test] + fn snapshot_returns_separate_buckets_at_finer_interval() { + let ring = ApiStatsRing::new(); + let t0 = 1_700_000_060_000; // aligned to 60s boundary + ring.record(t0 + 1_000, 200, 0, 0); + ring.record(t0 + 2_000, 200, 0, 0); + ring.record(t0 - 30_000, 200, 0, 0); + + let snap = ring.snapshot(t0 + 5_000, 120, 60); + assert_eq!(snap.buckets.len(), 2); + // newest bucket (covering t0..t0+60s) saw 2 requests + assert_eq!(snap.buckets[0].request_count, 2); + // previous minute saw 1 + assert_eq!(snap.buckets[1].request_count, 1); + } + + #[test] + fn snapshot_ignores_stale_data_from_recycled_slots() { + let ring = ApiStatsRing::new(); + let t0 = 1_700_000_000_000; + // Record well outside the window the snapshot will look at. + ring.record(t0 - 7_200_000, 200, 99, 99); + // The current minute should report zero, not the stale bucket + // that happens to live in the same ring slot. + let snap = ring.snapshot(t0, 60, 60); + assert_eq!(snap.buckets[0].request_count, 0); + assert_eq!(snap.buckets[0].bytes_in, 0); + assert_eq!(snap.buckets[0].bytes_out, 0); + } + + #[test] + fn snapshot_clamps_window_to_buffer_size() { + let ring = ApiStatsRing::new(); + let snap = ring.snapshot(1_700_000_000_000, 1_000_000, 60); + assert!(snap.window_seconds <= BUCKET_COUNT as u64); + } +} diff --git a/src/server/live_router.rs b/src/server/live_router.rs index 3418276d..cccae7a5 100644 --- a/src/server/live_router.rs +++ b/src/server/live_router.rs @@ -1,6 +1,13 @@ use crate::models; use crate::openapi_spec::{OpenApiAppState, PingResponse, VersionResponse}; use crate::server::api_contract::TransportApiCore; +use crate::server::api_event_stream::{ + ApiEventBroadcaster, ApiRequestEvent, BODY_CAPTURE_HARD_CAP_BYTES, CapturedBody, + body_capture_limit, +}; +use crate::server::api_stats::{ + ApiStatsRing, ApiStatsSnapshot, DEFAULT_INTERVAL_SECONDS, DEFAULT_WINDOW_SECONDS, +}; use crate::server::auth::{SharedCredentialCache, SharedHtpasswd}; use crate::server::credential_cache::CredentialCache; use crate::server::dashboard::serve_dashboard; @@ -8,11 +15,11 @@ use crate::server::htpasswd::HtpasswdFile; use crate::server::http_server::Server; use crate::server::http_transport::*; use crate::server::transport_types::auth_types::{AuthData, Authorization, Scopes, from_headers}; -use crate::server::transport_types::context_types::{EmptyContext, Push, XSpanIdString}; +use crate::server::transport_types::context_types::{EmptyContext, Has, Push, XSpanIdString}; use axum::Router; use axum::body::Body; use axum::extract::{DefaultBodyLimit, Path, Query, Request, State}; -use axum::http::header::{HeaderName, HeaderValue}; +use axum::http::header::{CONTENT_TYPE, HeaderName, HeaderValue}; use axum::http::{Response, StatusCode}; use axum::middleware::{self, Next}; use axum::routing::{delete, get, post, put}; @@ -285,6 +292,11 @@ pub fn app_router(state: LiveRouterState) -> Router { get(list_ro_crate_entities).delete(delete_ro_crate_entities), ) .route("/torc-service/v1/admin/reload-auth", post(reload_auth)) + .route( + "/torc-service/v1/admin/api-events/stream", + get(admin_api_events_stream), + ) + .route("/torc-service/v1/admin/api-stats", get(admin_api_stats)) .route( "/torc-service/v1/workflows", get(list_workflows).post(create_workflow), @@ -369,10 +381,24 @@ pub fn app_router(state: LiveRouterState) -> Router { get(workflow_events_stream_route), ) .fallback(dashboard_fallback) + // Innermost: build SSE events for connected admin subscribers. + // Reads the auth context populated by `inject_request_context`. + .layer(middleware::from_fn_with_state( + state.server.api_event_broadcaster.clone(), + capture_api_event, + )) + // Middle: resolve auth, inject request context, short-circuit + // on unauthenticated requests. .layer(middleware::from_fn_with_state( state.auth.clone(), inject_request_context, )) + // Outermost: load-stats accounting. Lives outside the auth layer + // so that 401s are still counted in `/admin/api-stats`. + .layer(middleware::from_fn_with_state( + state.server.api_stats.clone(), + record_api_stats, + )) .with_state(state) } @@ -527,6 +553,122 @@ pub async fn reload_auth( } } +#[derive(Debug, Clone, Deserialize, IntoParams)] +#[into_params(parameter_in = Query)] +pub struct ApiEventStreamQuery { + /// When true, captured request and response bodies are included in + /// each event. Bodies are only captured when the inbound size is known + /// up front (via `Content-Length` or body size hint) and at most + /// 1 MiB; SSE response streams are skipped. Defaults to false so + /// payloads aren't streamed unless requested. + #[serde(default)] + #[param(nullable = true)] + pub include_bodies: Option, +} + +#[utoipa::path( + get, + tag = "system", + path = "/admin/api-events/stream", + operation_id = "admin_api_events_stream", + params(ApiEventStreamQuery), + responses( + (status = 200, description = "Server-Sent Events stream of inbound API requests") + ) +)] +pub async fn admin_api_events_stream( + State(state): State, + Query(params): Query, +) -> Response { + let bus = state.server.api_event_broadcaster.clone(); + let mut receiver = bus.subscribe(); + let include_bodies = params.include_bodies.unwrap_or(false); + let body_guard = if include_bodies { + Some(bus.body_subscriber_guard()) + } else { + None + }; + + let stream = async_stream::stream! { + // Keep the body-subscriber count elevated for the lifetime of + // this connection. + let _body_guard = body_guard; + loop { + match receiver.recv().await { + Ok(mut event) => { + redact_for_subscriber(&mut event, include_bodies); + let data = serde_json::to_string(&event).unwrap_or_default(); + yield Ok::<_, std::convert::Infallible>(format!( + "event: api\ndata: {}\n\n", + data + )); + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(count)) => { + yield Ok::<_, std::convert::Infallible>(format!( + "event: warning\ndata: {{\"dropped\": {}}}\n\n", + count + )); + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + } + } + }; + + Response::builder() + .status(StatusCode::OK) + .header(CONTENT_TYPE, "text/event-stream") + .header("Cache-Control", "no-cache") + .header("X-Accel-Buffering", "no") + .body(Body::from_stream(stream)) + .expect("valid SSE response") +} + +#[derive(Debug, Clone, Deserialize, IntoParams)] +#[into_params(parameter_in = Query)] +pub struct ApiStatsQuery { + /// Total span of time to report on, in seconds. Defaults to 3600 + /// (1 hour). Capped at the ring buffer's retention (3600s). + #[serde(default)] + #[param(nullable = true)] + pub window_seconds: Option, + /// Aggregation bucket width in seconds. Defaults to 60. + #[serde(default)] + #[param(nullable = true)] + pub interval_seconds: Option, +} + +#[utoipa::path( + get, + tag = "system", + path = "/admin/api-stats", + operation_id = "admin_api_stats", + params(ApiStatsQuery), + responses( + (status = 200, description = "Aggregated request counts and bytes per bucket") + ) +)] +pub async fn admin_api_stats( + State(state): State, + Query(params): Query, +) -> Response { + let window = params.window_seconds.unwrap_or(DEFAULT_WINDOW_SECONDS); + let interval = params.interval_seconds.unwrap_or(DEFAULT_INTERVAL_SECONDS); + let now_ms = chrono::Utc::now().timestamp_millis(); + let snapshot: ApiStatsSnapshot = state.server.api_stats.snapshot(now_ms, window, interval); + json_response_ok(&snapshot) +} + +fn json_response_ok(value: &T) -> Response { + match serde_json::to_vec(value) { + Ok(body) => Response::builder() + .status(StatusCode::OK) + .header(CONTENT_TYPE, "application/json") + .body(Body::from(body)) + .expect("valid json response"), + Err(err) => error_response(StatusCode::INTERNAL_SERVER_ERROR, err.to_string()), + } +} + async fn dashboard_fallback(request: Request) -> Response { serve_dashboard(request.uri().path()).unwrap_or_else(not_found_response) } @@ -4110,6 +4252,218 @@ async fn inject_request_context( response } +async fn record_api_stats( + State(stats): State, + request: Request, + next: Next, +) -> Response { + let bytes_in = content_length_header(request.headers()); + let response = next.run(request).await; + let bytes_out = content_length_header(response.headers()); + stats.record( + chrono::Utc::now().timestamp_millis(), + response.status().as_u16(), + bytes_in, + bytes_out, + ); + response +} + +async fn capture_api_event( + State(bus): State, + request: Request, + next: Next, +) -> Response { + if bus.receiver_count() == 0 { + return next.run(request).await; + } + let want_bodies = bus.body_subscriber_count() > 0; + + let started = std::time::Instant::now(); + let method = request.method().as_str().to_owned(); + let path = request.uri().path().to_owned(); + let query = request.uri().query().map(|s| s.to_owned()); + + let advisory_user = request + .headers() + .get("x-torc-client-user") + .and_then(|v| v.to_str().ok()) + .map(|s| s.trim()) + .filter(|s| !s.is_empty()) + .map(|s| s.to_owned()); + + let (request_id, user) = match request.extensions().get::() { + Some(ctx) => { + let span_id: &XSpanIdString = ctx.get(); + let auth: &Option = ctx.get(); + let resolved = auth.as_ref().map(|a| a.subject.clone()); + // When the server runs without `--auth-file`, every request + // resolves to "anonymous" — fall back to the advisory client + // header so the event stream surfaces who's calling. + let user = match resolved.as_deref() { + None | Some("anonymous") => advisory_user.or(resolved), + Some(_) => resolved, + }; + (Some(span_id.0.clone()), user) + } + None => (None, advisory_user), + }; + + let display_limit = body_capture_limit(); + + let (request, request_body) = if want_bodies { + match capture_request_body(request, display_limit).await { + Ok(pair) => pair, + Err(early) => return early, + } + } else { + (request, None) + }; + + let response = next.run(request).await; + + let (response, response_body) = if want_bodies { + match capture_response_body(response, display_limit).await { + Ok(pair) => pair, + Err(early) => return early, + } + } else { + (response, None) + }; + + let event = ApiRequestEvent { + timestamp_ms: chrono::Utc::now().timestamp_millis(), + method, + path, + query, + status: response.status().as_u16(), + latency_ms: started.elapsed().as_millis().min(u64::MAX as u128) as u64, + request_id, + user, + request_body, + response_body, + }; + + bus.broadcast(event); + response +} + +fn content_length_header(headers: &axum::http::HeaderMap) -> u64 { + headers + .get(axum::http::header::CONTENT_LENGTH) + .and_then(|v| v.to_str().ok()) + .and_then(|s| s.parse::().ok()) + .unwrap_or(0) +} + +/// Outcome of attempting to capture a request body. +/// +/// `Err(response)` means the body could not be read and the middleware +/// should short-circuit with the contained error response rather than +/// silently substituting an empty body for the handler. +async fn capture_request_body( + request: Request, + display_limit: usize, +) -> Result<(Request, Option), Response> { + use http_body::Body as _; + use http_body_util::BodyExt as _; + + if !body_size_known_within_cap(request.headers(), request.body().size_hint().upper()) { + return Ok((request, None)); + } + + let (parts, body) = request.into_parts(); + match body.collect().await { + Ok(collected) => { + let bytes = collected.to_bytes(); + let captured = if bytes.len() > BODY_CAPTURE_HARD_CAP_BYTES { + None + } else { + Some(CapturedBody::from_bytes(&bytes, display_limit)) + }; + let new_req = Request::from_parts(parts, Body::from(bytes)); + Ok((new_req, captured)) + } + Err(_) => Err(error_response( + StatusCode::BAD_REQUEST, + "request body could not be read".to_string(), + )), + } +} + +/// Outcome of attempting to capture a response body. `Err(response)` +/// means the handler's body errored mid-stream while we were buffering +/// it for capture; we replace the broken response with a synthesized +/// 502 rather than truncating it to an empty body. +async fn capture_response_body( + response: Response, + display_limit: usize, +) -> Result<(Response, Option), Response> { + use http_body::Body as _; + use http_body_util::BodyExt as _; + + let is_event_stream = response + .headers() + .get(CONTENT_TYPE) + .and_then(|v| v.to_str().ok()) + .map(|ct| ct.starts_with("text/event-stream")) + .unwrap_or(false); + if is_event_stream { + return Ok((response, None)); + } + if !body_size_known_within_cap(response.headers(), response.body().size_hint().upper()) { + return Ok((response, None)); + } + + let (parts, body) = response.into_parts(); + match body.collect().await { + Ok(collected) => { + let bytes = collected.to_bytes(); + let captured = if bytes.len() > BODY_CAPTURE_HARD_CAP_BYTES { + None + } else { + Some(CapturedBody::from_bytes(&bytes, display_limit)) + }; + let new_resp = Response::from_parts(parts, Body::from(bytes)); + Ok((new_resp, captured)) + } + Err(_) => Err(error_response( + StatusCode::BAD_GATEWAY, + "response body could not be read".to_string(), + )), + } +} + +/// Redact any captured bodies for subscribers that did not opt in to +/// `include_bodies=true`. Bodies are captured at the broadcaster level +/// when *any* subscriber wants them, so per-connection filtering happens +/// here. +fn redact_for_subscriber(event: &mut ApiRequestEvent, include_bodies: bool) { + if !include_bodies { + event.request_body = None; + event.response_body = None; + } +} + +/// Returns `true` only when we can prove (from `Content-Length` or the +/// body's own size hint) that the payload is at most +/// [`BODY_CAPTURE_HARD_CAP_BYTES`]. Bodies whose size is unknown (e.g. +/// chunked uploads with no advertised length) are treated as too big to +/// safely buffer for capture. +fn body_size_known_within_cap(headers: &axum::http::HeaderMap, hint_upper: Option) -> bool { + let cl = headers + .get(axum::http::header::CONTENT_LENGTH) + .and_then(|v| v.to_str().ok()) + .and_then(|s| s.parse::().ok()); + let bounded = match (cl, hint_upper) { + (None, None) => return false, + (Some(c), None) => c, + (None, Some(u)) => u, + (Some(c), Some(u)) => c.max(u), + }; + bounded <= BODY_CAPTURE_HARD_CAP_BYTES as u64 +} + fn add_standard_response_headers(response: &mut Response, span_id: &XSpanIdString) { response.headers_mut().insert( HeaderName::from_static("x-span-id"), @@ -4262,6 +4616,303 @@ mod live_router_tests { assert_eq!(response.status(), StatusCode::METHOD_NOT_ALLOWED); } + #[tokio::test] + async fn capture_middleware_publishes_event_for_each_request() { + let server = test_server_with_schema().await; + let bus = server.api_event_broadcaster.clone(); + let mut rx = bus.subscribe(); + let router = test_router(server); + + let response = router + .oneshot( + Request::builder() + .method("GET") + .uri("/torc-service/v1/ping?probe=1") + .body(Body::empty()) + .expect("valid request"), + ) + .await + .expect("router response"); + + assert_eq!(response.status(), StatusCode::OK); + + let event = tokio::time::timeout(std::time::Duration::from_secs(1), rx.recv()) + .await + .expect("event broadcast within 1s") + .expect("broadcast succeeded"); + + assert_eq!(event.method, "GET"); + assert_eq!(event.path, "/torc-service/v1/ping"); + assert_eq!(event.query.as_deref(), Some("probe=1")); + assert_eq!(event.status, 200); + assert!(event.request_id.is_some(), "x-span-id should be set"); + // Without a body subscriber, bodies stay None even though a + // metadata receiver is connected. + assert!(event.request_body.is_none()); + assert!(event.response_body.is_none()); + } + + #[tokio::test] + async fn api_stats_endpoint_reports_recorded_requests() { + let server = test_server_with_schema().await; + let router = test_router(server); + + // Drive a few requests through the middleware so the ring has + // something to report. + for _ in 0..3 { + let resp = router + .clone() + .oneshot( + Request::builder() + .method("GET") + .uri("/torc-service/v1/ping") + .body(Body::empty()) + .expect("valid request"), + ) + .await + .expect("router response"); + assert_eq!(resp.status(), StatusCode::OK); + } + + let resp = router + .oneshot( + Request::builder() + .method("GET") + .uri("/torc-service/v1/admin/api-stats?window_seconds=60&interval_seconds=60") + .body(Body::empty()) + .expect("valid request"), + ) + .await + .expect("router response"); + assert_eq!(resp.status(), StatusCode::OK); + let bytes = resp + .into_body() + .collect() + .await + .expect("collect body") + .to_bytes(); + let snap: serde_json::Value = + serde_json::from_slice(&bytes).expect("valid api-stats response"); + let buckets = snap["buckets"].as_array().expect("buckets array"); + assert!(!buckets.is_empty()); + // 3 ping requests + the api-stats request itself = at least 3 + let total_requests: u64 = buckets + .iter() + .map(|b| b["request_count"].as_u64().unwrap_or(0)) + .sum(); + assert!( + total_requests >= 3, + "expected at least 3 recorded requests, got {total_requests}" + ); + } + + #[tokio::test] + async fn api_stats_records_unauthenticated_requests() { + let server = test_server_with_schema().await; + let stats = server.api_stats.clone(); + let router = test_router_with_auth(server, true); + + let response = router + .oneshot( + Request::builder() + .method("GET") + .uri("/torc-service/v1/ping") + .body(Body::empty()) + .expect("valid request"), + ) + .await + .expect("router response"); + assert_eq!(response.status(), StatusCode::UNAUTHORIZED); + + let snap = stats.snapshot(chrono::Utc::now().timestamp_millis(), 60, 60); + let total: u64 = snap.buckets.iter().map(|b| b.request_count).sum(); + let total_4xx: u64 = snap.buckets.iter().map(|b| b.status_4xx).sum(); + assert_eq!(total, 1, "the 401 should still be recorded"); + assert_eq!(total_4xx, 1, "the 401 should be tallied as a 4xx"); + } + + #[test] + fn redact_for_subscriber_strips_bodies_when_not_opted_in() { + let mut event = ApiRequestEvent { + timestamp_ms: 1, + method: "POST".into(), + path: "/x".into(), + query: None, + status: 200, + latency_ms: 0, + request_id: None, + user: None, + request_body: Some(CapturedBody { + bytes: 5, + truncated: false, + text: Some("hello".into()), + }), + response_body: Some(CapturedBody { + bytes: 2, + truncated: false, + text: Some("ok".into()), + }), + }; + redact_for_subscriber(&mut event, false); + assert!(event.request_body.is_none()); + assert!(event.response_body.is_none()); + } + + #[test] + fn redact_for_subscriber_keeps_bodies_when_opted_in() { + let mut event = ApiRequestEvent { + timestamp_ms: 1, + method: "POST".into(), + path: "/x".into(), + query: None, + status: 200, + latency_ms: 0, + request_id: None, + user: None, + request_body: Some(CapturedBody { + bytes: 5, + truncated: false, + text: Some("hello".into()), + }), + response_body: None, + }; + redact_for_subscriber(&mut event, true); + assert!(event.request_body.is_some()); + } + + #[tokio::test] + async fn capture_short_circuits_on_request_body_error() { + let server = test_server_with_schema().await; + let bus = server.api_event_broadcaster.clone(); + // Hold a subscriber + body guard so the middleware tries to + // capture the request body. + let _rx = bus.subscribe(); + let _body_guard = bus.body_subscriber_guard(); + let router = test_router(server); + + // Stream yields a single error; the middleware's collect() will + // surface it. Content-Length keeps the size-known check happy + // so we actually exercise the collect path. + let bad_stream = futures::stream::iter(vec![Err::<&[u8], std::io::Error>( + std::io::Error::other("boom"), + )]); + let bad_body = Body::from_stream(bad_stream); + + let response = router + .oneshot( + Request::builder() + .method("POST") + .uri("/torc-service/v1/workflows") + .header(CONTENT_TYPE, "application/json") + .header(axum::http::header::CONTENT_LENGTH, "10") + .body(bad_body) + .expect("valid request"), + ) + .await + .expect("router response"); + + // Middleware short-circuits with 400 instead of forwarding an + // empty body to the handler. + assert_eq!(response.status(), StatusCode::BAD_REQUEST); + } + + #[test] + fn body_size_known_within_cap_handles_inputs() { + use axum::http::HeaderMap; + // Both unknown -> can't safely capture + assert!(!body_size_known_within_cap(&HeaderMap::new(), None)); + + // Content-Length set, small -> ok + let mut headers = HeaderMap::new(); + headers.insert( + axum::http::header::CONTENT_LENGTH, + HeaderValue::from_static("128"), + ); + assert!(body_size_known_within_cap(&headers, None)); + + // Content-Length set, oversized -> reject + let mut headers = HeaderMap::new(); + headers.insert( + axum::http::header::CONTENT_LENGTH, + HeaderValue::from_static("99999999"), + ); + assert!(!body_size_known_within_cap(&headers, None)); + + // No Content-Length, but body advertises a small upper bound + assert!(body_size_known_within_cap(&HeaderMap::new(), Some(256))); + + // Both signals present, the larger one wins for the cap check + let mut headers = HeaderMap::new(); + headers.insert( + axum::http::header::CONTENT_LENGTH, + HeaderValue::from_static("128"), + ); + assert!(!body_size_known_within_cap(&headers, Some(99_999_999))); + } + + #[tokio::test] + async fn capture_middleware_falls_back_to_client_user_header() { + let server = test_server_with_schema().await; + let bus = server.api_event_broadcaster.clone(); + let mut rx = bus.subscribe(); + let router = test_router(server); + + let response = router + .oneshot( + Request::builder() + .method("GET") + .uri("/torc-service/v1/ping") + .header("X-Torc-Client-User", "alice") + .body(Body::empty()) + .expect("valid request"), + ) + .await + .expect("router response"); + assert_eq!(response.status(), StatusCode::OK); + + let event = tokio::time::timeout(std::time::Duration::from_secs(1), rx.recv()) + .await + .expect("event broadcast within 1s") + .expect("broadcast succeeded"); + + // Without auth configured the resolved subject would be + // "anonymous"; the advisory header takes its place. + assert_eq!(event.user.as_deref(), Some("alice")); + } + + #[tokio::test] + async fn capture_middleware_streams_bodies_when_subscriber_requests_them() { + let server = test_server_with_schema().await; + let bus = server.api_event_broadcaster.clone(); + let mut rx = bus.subscribe(); + let _body_guard = bus.body_subscriber_guard(); + let router = test_router(server); + + let response = router + .oneshot( + Request::builder() + .method("GET") + .uri("/torc-service/v1/ping") + .body(Body::empty()) + .expect("valid request"), + ) + .await + .expect("router response"); + assert_eq!(response.status(), StatusCode::OK); + + let event = tokio::time::timeout(std::time::Duration::from_secs(1), rx.recv()) + .await + .expect("event broadcast within 1s") + .expect("broadcast succeeded"); + + assert_eq!(event.path, "/torc-service/v1/ping"); + let resp_body = event + .response_body + .expect("response body should be captured"); + assert!(!resp_body.truncated); + assert!(resp_body.text.is_some()); + } + #[tokio::test] async fn router_falls_back_for_unknown_path() { let router = test_router(test_server_with_schema().await); @@ -4497,12 +5148,16 @@ mod live_router_tests { } fn test_router(server: Server) -> Router { + test_router_with_auth(server, false) + } + + fn test_router_with_auth(server: Server, require_auth: bool) -> Router { app_router(LiveRouterState { openapi_state: server.openapi_app_state(), server, auth: LiveAuthState { htpasswd: Arc::new(RwLock::new(None)), - require_auth: false, + require_auth, credential_cache: Arc::new(RwLock::new(None)), }, }) diff --git a/src/server/live_state.rs b/src/server/live_state.rs index cd796d53..e8e59a88 100644 --- a/src/server/live_state.rs +++ b/src/server/live_state.rs @@ -6,6 +6,8 @@ use crate::server::api::{ RoCrateApiImpl, SchedulersApiImpl, SlurmStatsApiImpl, UserDataApiImpl, WorkflowActionsApiImpl, WorkflowsApiImpl, }; +use crate::server::api_event_stream::ApiEventBroadcaster; +use crate::server::api_stats::ApiStatsRing; use crate::server::auth::{SharedCredentialCache, SharedHtpasswd}; use crate::server::authorization::AuthorizationService; use crate::server::event_broadcast::EventBroadcaster; @@ -21,6 +23,8 @@ pub struct LiveServerState { pub workflows_with_failures: Arc>>, pub authorization_service: AuthorizationService, pub event_broadcaster: EventBroadcaster, + pub api_event_broadcaster: ApiEventBroadcaster, + pub api_stats: ApiStatsRing, pub htpasswd: SharedHtpasswd, pub auth_file_path: Option, pub credential_cache: SharedCredentialCache, @@ -60,6 +64,8 @@ impl LiveServerState { workflows_with_failures: Arc::new(std::sync::RwLock::new(HashSet::new())), authorization_service, event_broadcaster: EventBroadcaster::new(512), + api_event_broadcaster: ApiEventBroadcaster::default(), + api_stats: ApiStatsRing::new(), htpasswd, auth_file_path, credential_cache,