diff --git a/Cargo.toml b/Cargo.toml index 141cd67e..81cf984a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -179,6 +179,13 @@ txtx-core = { version = "0.4.15" } txtx-gql = { version = "0.3.9" } txtx-supervisor-ui = { version = "0.2.10", default-features = false } +opentelemetry = { version = "0.28", default-features = false, features = ["metrics"] } +opentelemetry_sdk = { version = "0.28", default-features = false, features = ["rt-tokio", "metrics"] } +opentelemetry-otlp = { version = "0.28", default-features = false, features = ["metrics", "grpc-tonic"] } +opentelemetry-prometheus = { version = "0.28", default-features = false } +prometheus = { version = "0.13", default-features = false } +axum = { version = "0.8", default-features = false, features = ["tokio", "http1"] } + # [patch.crates-io] ## Local # txtx-addon-kit = { path = "../txtx/crates/txtx-addon-kit" } diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index 1bf3e866..ed70b853 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -78,6 +78,7 @@ postgres = ["surfpool-gql/postgres", "surfpool-core/postgres"] version_check = [] subgraph = ["surfpool-core/subgraph"] register-tracing = ["surfpool-core/register-tracing"] +prometheus = ["surfpool-core/prometheus"] [target.'cfg(not(target_os = "windows"))'.dependencies] fork = "0.2.0" diff --git a/crates/cli/src/cli/mod.rs b/crates/cli/src/cli/mod.rs index 4060d318..cac92df3 100644 --- a/crates/cli/src/cli/mod.rs +++ b/crates/cli/src/cli/mod.rs @@ -20,6 +20,8 @@ use solana_keypair::Keypair; use solana_pubkey::Pubkey; use solana_signer::{EncodableKey, Signer}; use surfpool_mcp::McpOptions; +#[cfg(feature = "prometheus")] +use surfpool_types::TelemetryConfig; use surfpool_types::{ AccountSnapshot, BlockProductionMode, CHANGE_TO_DEFAULT_STUDIO_PORT_ONCE_SUPERVISOR_MERGED, DEFAULT_DEVNET_RPC_URL, DEFAULT_GOSSIP_PORT, DEFAULT_MAINNET_RPC_URL, DEFAULT_NETWORK_HOST, @@ -260,6 +262,20 @@ pub struct StartSimnet { /// When multiple files are provided, later files override earlier ones for duplicate keys. #[arg(long = "snapshot")] pub snapshot: Vec, + /// Enable Prometheus metrics endpoint + #[cfg(feature = "prometheus")] + /// Enable Prometheus metrics endpoint + #[arg(long = "metrics-enabled", env = "SURFPOOL_METRICS_ENABLED")] + pub metrics_enabled: bool, + + #[cfg(feature = "prometheus")] + /// Prometheus metrics endpoint address + #[arg( + long = "metrics-addr", + default_value = "0.0.0.0:9000", + env = "SURFPOOL_METRICS_ADDR" + )] + pub metrics_addr: String, /// Skip signature verification for all transactions (eg. surfpool start --skip-signature-verification) #[clap(long = "skip-signature-verification", action=ArgAction::SetTrue, default_value = "false")] pub skip_signature_verification: bool, @@ -459,6 +475,15 @@ impl StartSimnet { subgraph: self.subgraph_config(), studio: self.studio_config(), plugin_config_path, + #[cfg(feature = "prometheus")] + telemetry: self.telemetry_config(), + } + } + #[cfg(feature = "prometheus")] + pub fn telemetry_config(&self) -> TelemetryConfig { + TelemetryConfig { + enabled: self.metrics_enabled, + prometheus_addr: self.metrics_addr.clone(), } } } diff --git a/crates/cli/src/cli/simnet/mod.rs b/crates/cli/src/cli/simnet/mod.rs index e0c69848..4ea19947 100644 --- a/crates/cli/src/cli/simnet/mod.rs +++ b/crates/cli/src/cli/simnet/mod.rs @@ -63,6 +63,28 @@ pub async fn handle_start_local_surfnet_command( let (mut surfnet_svm, simnet_events_rx, geyser_events_rx) = SurfnetSvm::new_with_db(cmd.db.as_deref(), &cmd.surfnet_id) .map_err(|e| format!("Failed to initialize Surfnet SVM: {}", e))?; + #[cfg(feature = "prometheus")] + { + let telemetry_config = cmd.telemetry_config(); + if telemetry_config.enabled { + match surfpool_core::telemetry::init_from_config( + telemetry_config.enabled, + &telemetry_config.prometheus_addr, + ) { + Err(e) => { + let _ = surfnet_svm + .simnet_events_tx + .send(SimnetEvent::warn(format!("Metrics init failed: {}", e))); + } + Ok(_) => { + let _ = surfnet_svm.simnet_events_tx.send(SimnetEvent::info(format!( + "Metrics available at http://{}/metrics", + telemetry_config.prometheus_addr + ))); + } + } + } + } // Apply feature configuration from CLI flags let feature_config = cmd.feature_config(); @@ -443,6 +465,21 @@ fn log_events( let _ = simnet_commands_tx .send(SimnetCommand::CompleteRunbookExecution(runbook_id, errors)); } + #[cfg(feature = "prometheus")] + SimnetEvent::MetricsData(metrics_data) => { + surfpool_core::telemetry::metrics().record_svm_state( + metrics_data.slot, + metrics_data.epoch, + metrics_data.slot_index, + metrics_data.transactions_count, + metrics_data.transactions_processed, + metrics_data.start_time, + metrics_data.signature_subs, + metrics_data.account_subs, + metrics_data.slot_subs, + metrics_data.logs_subs, + ); + } }, Err(_e) => { break; diff --git a/crates/cli/src/tui/simnet.rs b/crates/cli/src/tui/simnet.rs index ee236f26..d357275c 100644 --- a/crates/cli/src/tui/simnet.rs +++ b/crates/cli/src/tui/simnet.rs @@ -596,6 +596,21 @@ fn run_app(terminal: &mut Terminal, mut app: App) -> io::Result<( ); app.status_bar_message = None; } + #[cfg(feature = "prometheus")] + SimnetEvent::MetricsData(metrics_data) => { + { + // In TUI mode, we don't need to record metrics to Prometheus + // because the metrics server is running in its own thread + // But we might want to display something in debug mode + if app.include_debug_logs { + new_events.push(( + EventType::Debug, + Local::now(), + format!("Metrics updated: slot={}", metrics_data.slot), + )); + } + } + } }, Err(_) => break, }, diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index b9529a4c..956e3216 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -100,7 +100,12 @@ anchor-lang-idl = { workspace = true } txtx-addon-kit = { workspace = true } txtx-addon-network-svm-types = { workspace = true } txtx-addon-network-svm = { workspace = true } - +# Prometheus metrics - declare normally, control via feature +opentelemetry = { version = "0.28", default-features = false, features = ["metrics"], optional = true } +opentelemetry_sdk = { version = "0.28", default-features = false, features = ["rt-tokio", "metrics"], optional = true } +opentelemetry-prometheus = { version = "0.28", default-features = false, optional = true } +prometheus = { version = "0.13", default-features = false, optional = true } +axum = { version = "0.8", default-features = false, features = ["tokio", "http1"], optional = true } [dev-dependencies] test-case = { workspace = true } @@ -116,3 +121,4 @@ ignore_tests_ci = [] geyser_plugin = [] # Disabled: solana-geyser-plugin-manager conflicts with litesvm 0.9.1 subgraph = ["surfpool-subgraph"] register-tracing = ["litesvm/register-tracing"] +prometheus = ["dep:opentelemetry", "dep:opentelemetry_sdk", "dep:opentelemetry-prometheus", "dep:prometheus", "dep:axum"] \ No newline at end of file diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 9f1d3ebd..147cb00c 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -17,6 +17,7 @@ pub mod runloops; pub mod scenarios; pub mod storage; pub mod surfnet; +pub mod telemetry; pub mod types; use crossbeam_channel::{Receiver, Sender}; diff --git a/crates/core/src/rpc/admin.rs b/crates/core/src/rpc/admin.rs index 16661edf..bbd1c7f9 100644 --- a/crates/core/src/rpc/admin.rs +++ b/crates/core/src/rpc/admin.rs @@ -3,7 +3,7 @@ use std::time::Duration; use jsonrpc_core::{BoxFuture, Result}; use jsonrpc_derive::rpc; use solana_client::rpc_custom_error::RpcCustomError; -use surfpool_types::{SimnetCommand, SimnetEvent}; +use surfpool_types::{SimnetCommand, SimnetEvent, SurfpoolStatus, WsSubscriptions}; use txtx_addon_network_svm_types::subgraph::PluginConfig; use uuid::Uuid; @@ -192,6 +192,9 @@ pub trait AdminRpc { /// - This method is useful for monitoring system uptime and verifying system health. #[rpc(meta, name = "startTime")] fn start_time(&self, meta: Self::Metadata) -> Result; + + #[rpc(meta, name = "surfpoolStatus")] + fn surfpool_status(&self, meta: Self::Metadata) -> Result; } pub struct SurfpoolAdminRpc; @@ -363,4 +366,56 @@ impl AdminRpc for SurfpoolAdminRpc { let datetime_utc: chrono::DateTime = system_time.into(); Ok(datetime_utc.to_rfc3339()) } + fn surfpool_status(&self, meta: Self::Metadata) -> Result { + // Ensure we have RunloopContext metadata + let Some(ctx) = meta else { + return Err(RpcCustomError::NodeUnhealthy { + num_slots_behind: None, + } + .into()); + }; + + // Read a snapshot of SVM state under a reader lock + // Read a consistent snapshot of SVM state + let status = ctx.svm_locker.with_svm_reader(|svm| { + // Epoch / slot info + let slot = svm.latest_epoch_info.absolute_slot; + let epoch = svm.latest_epoch_info.epoch; + let slot_index = svm.latest_epoch_info.slot_index; + + // transactions_count via Storage::count(); fall back to 0 on error + let transactions_count = svm.transactions.count().unwrap_or(0); + + // monotonic processed counter + let transactions_processed = svm.transactions_processed; + + // subscription counts (in-memory collections) + let signature_subscriptions = svm.signature_subscriptions.len(); + let account_subscriptions = svm.account_subscriptions.len(); + let slot_subscriptions = svm.slot_subscriptions.len(); + let logs_subscriptions = svm.logs_subscriptions.len(); + + // uptime in ms + let uptime_ms = match std::time::SystemTime::now().duration_since(svm.start_time) { + Ok(d) => d.as_millis() as u64, + Err(_) => 0, + }; + + SurfpoolStatus { + slot, + epoch, + slot_index, + transactions_count, + transactions_processed, + uptime_ms, + ws_subscriptions: WsSubscriptions { + signatures: signature_subscriptions, + accounts: account_subscriptions, + slots: slot_subscriptions, + logs: logs_subscriptions, + }, + } + }); + Ok(status) + } } diff --git a/crates/core/src/surfnet/locker.rs b/crates/core/src/surfnet/locker.rs index 18fe855e..d23183cb 100644 --- a/crates/core/src/surfnet/locker.rs +++ b/crates/core/src/surfnet/locker.rs @@ -51,6 +51,8 @@ use solana_transaction_status::{ TransactionConfirmationStatus as SolanaTransactionConfirmationStatus, UiConfirmedBlock, UiTransactionEncoding, }; +#[cfg(feature = "prometheus")] +use surfpool_types::MetricsData; use surfpool_types::{ AccountSnapshot, ComputeUnitsEstimationResult, ExecutionCapture, ExportSnapshotConfig, Idl, KeyedProfileResult, ProfileResult, RpcProfileResultConfig, RunbookExecutionStatusReport, @@ -1043,6 +1045,26 @@ impl SurfnetSvmLocker { self.with_svm_writer(|svm_writer| { svm_writer.write_executed_profile_result(signature, profile_result) })?; + + #[cfg(feature = "prometheus")] + { + let metric_data = self.with_svm_reader(|svm| MetricsData { + slot: svm.latest_epoch_info.absolute_slot, + epoch: svm.latest_epoch_info.epoch, + slot_index: svm.latest_epoch_info.slot_index, + transactions_count: svm.transactions.count().unwrap_or(0) as usize, + transactions_processed: svm.transactions_processed, + start_time: svm.start_time, + signature_subs: svm.signature_subscriptions.len(), + account_subs: svm.account_subscriptions.len(), + slot_subs: svm.slot_subscriptions.len(), + logs_subs: svm.logs_subscriptions.len(), + }); + let _ = self + .simnet_events_tx() + .send(SimnetEvent::MetricsData(metric_data)); + } + Ok(()) } diff --git a/crates/core/src/telemetry.rs b/crates/core/src/telemetry.rs new file mode 100644 index 00000000..d169857d --- /dev/null +++ b/crates/core/src/telemetry.rs @@ -0,0 +1,252 @@ +//! Prometheus metrics for Surfpool +//! +//! Feature `prometheus` enables a `/metrics` HTTP endpoint + +#[cfg(feature = "prometheus")] +use std::time::SystemTime; + +#[cfg(feature = "prometheus")] +mod instrumented { + use std::sync::{Once, OnceLock}; + + use opentelemetry::{ + KeyValue, + metrics::{Counter, Gauge, Meter, MeterProvider}, + }; + use opentelemetry_sdk::{Resource, metrics::SdkMeterProvider}; + use prometheus::Encoder; + + pub use super::*; + + static INIT: Once = Once::new(); + static METRICS: OnceLock = OnceLock::new(); + static METER_PROVIDER: OnceLock = OnceLock::new(); + + pub struct SurfpoolMetrics { + slot: Gauge, + epoch: Gauge, + slot_index: Gauge, + transactions_count: Gauge, + transactions_processed_total: Gauge, + uptime_seconds: Gauge, + ws_subscriptions_total: Gauge, + ws_signature_subscriptions: Gauge, + ws_account_subscriptions: Gauge, + ws_slot_subscriptions: Gauge, + ws_logs_subscriptions: Gauge, + transactions_processed: Counter, + } + + impl SurfpoolMetrics { + fn new(meter: Meter) -> Self { + Self { + slot: meter + .u64_gauge("surfpool_slot") + .with_description("Current slot height") + .build(), + epoch: meter + .u64_gauge("surfpool_epoch") + .with_description("Current epoch") + .build(), + slot_index: meter + .u64_gauge("surfpool_slot_index") + .with_description("Slot index within epoch") + .build(), + transactions_count: meter + .u64_gauge("surfpool_transactions_count") + .with_description("Number of transactions in storage") + .build(), + transactions_processed_total: meter + .u64_gauge("surfpool_transactions_processed_total") + .with_description("Total processed transactions") + .build(), + uptime_seconds: meter + .u64_gauge("surfpool_uptime_seconds") + .with_description("Time since start in seconds") + .build(), + ws_subscriptions_total: meter + .u64_gauge("surfpool_ws_subscriptions_total") + .with_description("Total WebSocket subscriptions") + .build(), + ws_signature_subscriptions: meter + .u64_gauge("surfpool_ws_signature_subscriptions") + .with_description("Signature subscriptions count") + .build(), + ws_account_subscriptions: meter + .u64_gauge("surfpool_ws_account_subscriptions") + .with_description("Account subscriptions count") + .build(), + ws_slot_subscriptions: meter + .u64_gauge("surfpool_ws_slot_subscriptions") + .with_description("Slot subscriptions count") + .build(), + ws_logs_subscriptions: meter + .u64_gauge("surfpool_ws_logs_subscriptions") + .with_description("Logs subscriptions count") + .build(), + transactions_processed: meter + .u64_counter("surfpool_transactions_processed") + .with_description("Transactions processed counter") + .build(), + } + } + + pub fn record_svm_state( + &self, + slot: u64, + epoch: u64, + slot_index: u64, + transactions_count: usize, + transactions_processed: u64, + start_time: SystemTime, + signature_subs: usize, + account_subs: usize, + slot_subs: usize, + logs_subs: usize, + ) { + let uptime_secs = SystemTime::now() + .duration_since(start_time) + .unwrap_or_default() + .as_secs(); + + self.slot.record(slot, &[]); + self.epoch.record(epoch, &[]); + self.slot_index.record(slot_index, &[]); + self.transactions_count + .record(transactions_count as u64, &[]); + self.transactions_processed_total + .record(transactions_processed, &[]); + self.uptime_seconds.record(uptime_secs, &[]); + + let total_subs = (signature_subs + account_subs + slot_subs + logs_subs) as u64; + self.ws_subscriptions_total.record(total_subs, &[]); + self.ws_signature_subscriptions + .record(signature_subs as u64, &[]); + self.ws_account_subscriptions + .record(account_subs as u64, &[]); + self.ws_slot_subscriptions.record(slot_subs as u64, &[]); + self.ws_logs_subscriptions.record(logs_subs as u64, &[]); + } + + pub fn increment_transactions_processed(&self, count: u64) { + self.transactions_processed.add(count, &[]); + } + } + + pub fn init_prometheus( + service_name: &str, + bind_addr: &str, + ) -> Result<(), Box> { + let service_name_owned = service_name.to_string(); + let bind_addr_owned = bind_addr.to_string(); + let mut result = Ok(()); + + INIT.call_once(|| { + let registry = prometheus::Registry::new(); + let exporter = match opentelemetry_prometheus::exporter() + .with_registry(registry.clone()) + .build() + { + Ok(exp) => exp, + Err(e) => { + result = Err(Box::new(e) as Box); + return; + } + }; + + let resource = Resource::builder() + .with_attributes(vec![KeyValue::new("service.name", service_name_owned)]) + .build(); + + let provider = SdkMeterProvider::builder() + .with_resource(resource) + .with_reader(exporter) + .build(); + + let meter = provider.meter("surfpool-core"); + let metrics = SurfpoolMetrics::new(meter); + + if let Err(e) = METER_PROVIDER.set(provider) { + result = Err(format!("Meter provider already initialized: {:?}", e).into()); + return; + } + if let Err(e) = METRICS.set(metrics) { + result = Err(format!("Metrics already initialized: {:?}", e).into()); + return; + } + + std::thread::spawn(move || { + let rt = match tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + { + Ok(rt) => rt, + Err(e) => { + eprintln!("Failed to create tokio runtime: {}", e); + return; + } + }; + rt.block_on(async { + let registry_clone = registry.clone(); + let app = axum::Router::new().route( + "/metrics", + axum::routing::get(move || { + let reg = registry_clone.clone(); + async move { + let encoder = prometheus::TextEncoder::new(); + let metric_families = reg.gather(); + let mut buffer = vec![]; + if let Err(e) = encoder.encode(&metric_families, &mut buffer) { + return ( + axum::http::StatusCode::INTERNAL_SERVER_ERROR, + format!("Failed to encode: {}", e), + ); + } + let body = String::from_utf8(buffer) + .unwrap_or_else(|_| "Invalid UTF8".to_string()); + (axum::http::StatusCode::OK, body) + } + }), + ); + let listener = match tokio::net::TcpListener::bind(&bind_addr_owned).await { + Ok(l) => l, + Err(e) => { + eprintln!("Failed to bind: {}", e); + return; + } + }; + if let Err(e) = axum::serve(listener, app).await { + eprintln!("Server error: {}", e); + } + }); + }); + }); + + result + } + + pub fn metrics() -> &'static SurfpoolMetrics { + METRICS + .get() + .expect("telemetry not initialized. Call init_prometheus() first") + } + + pub fn shutdown() { + if let Some(provider) = METER_PROVIDER.get() { + let _ = provider.shutdown(); + } + } +} + +#[cfg(feature = "prometheus")] +pub use instrumented::*; + +#[cfg(feature = "prometheus")] +pub fn init_from_config(enabled: bool, bind_addr: &str) -> Result<(), String> { + if !enabled { + log::info!("Prometheus metrics disabled"); + return Ok(()); + } + log::info!("Starting Prometheus metrics on {}", bind_addr); + init_prometheus("surfpool", bind_addr).map_err(|e| format!("Prometheus init failed: {}", e)) +} diff --git a/crates/types/src/types.rs b/crates/types/src/types.rs index 7cdc3f8a..5ba7a874 100644 --- a/crates/types/src/types.rs +++ b/crates/types/src/types.rs @@ -4,6 +4,7 @@ use std::{ fmt, path::PathBuf, str::FromStr, + time::SystemTime, }; use blake3::Hash; @@ -417,6 +418,8 @@ pub enum SubgraphCommand { #[derive(Debug)] pub enum SimnetEvent { /// Surfnet is ready, with the initial count of processed transactions from storage + #[cfg(feature = "prometheus")] + MetricsData(MetricsData), Ready(u64), Connected(String), Aborted(String), @@ -593,16 +596,19 @@ pub struct SanitizedConfig { pub workspace: Option, } -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug, Default, Serialize, Deserialize)] pub struct SurfpoolConfig { pub simnets: Vec, pub rpc: RpcConfig, pub subgraph: SubgraphConfig, pub studio: StudioConfig, pub plugin_config_path: Vec, + #[cfg(feature = "prometheus")] + #[serde(default)] + pub telemetry: TelemetryConfig, } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct SimnetConfig { pub offline_mode: bool, pub remote_rpc_url: Option, @@ -662,14 +668,14 @@ impl SimnetConfig { } } -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug, Default, Serialize, Deserialize)] pub struct SubgraphConfig {} pub const DEFAULT_GOSSIP_PORT: u16 = 8001; pub const DEFAULT_TPU_PORT: u16 = 8003; pub const DEFAULT_TPU_QUIC_PORT: u16 = 8004; -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct RpcConfig { pub bind_host: String, pub bind_port: u16, @@ -701,7 +707,7 @@ impl Default for RpcConfig { } } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct StudioConfig { pub bind_host: String, pub bind_port: u16, @@ -1256,6 +1262,54 @@ impl RunbookExecutionStatusReport { self.errors = error; } } +/// WebSocket subscription counts +#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default)] +pub struct WsSubscriptions { + pub signatures: usize, + pub accounts: usize, + pub slots: usize, + pub logs: usize, +} + +/// Surfpool node status information +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SurfpoolStatus { + pub slot: u64, + pub epoch: u64, + pub slot_index: u64, + pub transactions_count: u64, + pub transactions_processed: u64, + pub uptime_ms: u64, + pub ws_subscriptions: WsSubscriptions, +} + +#[cfg(feature = "prometheus")] +#[derive(Clone, Debug)] +pub struct MetricsData { + pub slot: u64, + pub epoch: u64, + pub slot_index: u64, + pub transactions_count: usize, + pub transactions_processed: u64, + pub start_time: SystemTime, + pub signature_subs: usize, + pub account_subs: usize, + pub slot_subs: usize, + pub logs_subs: usize, +} + +#[cfg(feature = "prometheus")] +#[derive(Clone, Debug, Default, Serialize, Deserialize)] +pub struct TelemetryConfig { + pub enabled: bool, + #[serde(default = "default_prometheus_addr")] + pub prometheus_addr: String, +} + +#[cfg(feature = "prometheus")] +fn default_prometheus_addr() -> String { + "0.0.0.0:9000".to_string() +} #[cfg(test)] mod tests {