From 125107f454d4209e8ca76e1aae8bdb7f9da6bd5b Mon Sep 17 00:00:00 2001 From: Danish Ansari Date: Thu, 5 Mar 2026 08:34:21 +0530 Subject: [PATCH 1/3] feat(telemetry): Add Prometheus metrics endpoint for node monitoring --- Cargo.toml | 7 + crates/cli/Cargo.toml | 1 + crates/cli/src/cli/mod.rs | 19 +- crates/cli/src/cli/simnet/mod.rs | 31 +++ crates/cli/src/tui/simnet.rs | 15 ++ crates/core/Cargo.toml | 8 +- crates/core/src/lib.rs | 1 + crates/core/src/rpc/admin.rs | 57 +++++- crates/core/src/surfnet/locker.rs | 23 ++- crates/core/src/telemetry.rs | 316 ++++++++++++++++++++++++++++++ crates/types/src/types.rs | 59 +++++- 11 files changed, 526 insertions(+), 11 deletions(-) create mode 100644 crates/core/src/telemetry.rs diff --git a/Cargo.toml b/Cargo.toml index 141cd67e..81cf984a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -179,6 +179,13 @@ txtx-core = { version = "0.4.15" } txtx-gql = { version = "0.3.9" } txtx-supervisor-ui = { version = "0.2.10", default-features = false } +opentelemetry = { version = "0.28", default-features = false, features = ["metrics"] } +opentelemetry_sdk = { version = "0.28", default-features = false, features = ["rt-tokio", "metrics"] } +opentelemetry-otlp = { version = "0.28", default-features = false, features = ["metrics", "grpc-tonic"] } +opentelemetry-prometheus = { version = "0.28", default-features = false } +prometheus = { version = "0.13", default-features = false } +axum = { version = "0.8", default-features = false, features = ["tokio", "http1"] } + # [patch.crates-io] ## Local # txtx-addon-kit = { path = "../txtx/crates/txtx-addon-kit" } diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index 1bf3e866..ed70b853 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -78,6 +78,7 @@ postgres = ["surfpool-gql/postgres", "surfpool-core/postgres"] version_check = [] subgraph = ["surfpool-core/subgraph"] register-tracing = ["surfpool-core/register-tracing"] +prometheus = ["surfpool-core/prometheus"] [target.'cfg(not(target_os = "windows"))'.dependencies] fork = "0.2.0" diff --git a/crates/cli/src/cli/mod.rs b/crates/cli/src/cli/mod.rs index 4060d318..ceadcf48 100644 --- a/crates/cli/src/cli/mod.rs +++ b/crates/cli/src/cli/mod.rs @@ -25,7 +25,7 @@ use surfpool_types::{ DEFAULT_DEVNET_RPC_URL, DEFAULT_GOSSIP_PORT, DEFAULT_MAINNET_RPC_URL, DEFAULT_NETWORK_HOST, DEFAULT_RPC_PORT, DEFAULT_SLOT_TIME_MS, DEFAULT_TESTNET_RPC_URL, DEFAULT_TPU_PORT, DEFAULT_TPU_QUIC_PORT, DEFAULT_WS_PORT, RpcConfig, SimnetConfig, SimnetEvent, StudioConfig, - SubgraphConfig, SurfpoolConfig, SvmFeature, SvmFeatureConfig, + SubgraphConfig, SurfpoolConfig, SvmFeature, SvmFeatureConfig, TelemetryConfig, }; use txtx_cloud::LoginCommand; use txtx_core::manifest::WorkspaceManifest; @@ -260,6 +260,16 @@ pub struct StartSimnet { /// When multiple files are provided, later files override earlier ones for duplicate keys. #[arg(long = "snapshot")] pub snapshot: Vec, + /// Enable Prometheus metrics endpoint + #[arg(long = "metrics-enabled", env = "SURFPOOL_METRICS_ENABLED")] + pub metrics_enabled: bool, + /// Prometheus metrics endpoint address + #[arg( + long = "metrics-addr", + default_value = "0.0.0.0:9000", + env = "SURFPOOL_METRICS_ADDR" + )] + pub metrics_addr: String, /// Skip signature verification for all transactions (eg. surfpool start --skip-signature-verification) #[clap(long = "skip-signature-verification", action=ArgAction::SetTrue, default_value = "false")] pub skip_signature_verification: bool, @@ -459,6 +469,13 @@ impl StartSimnet { subgraph: self.subgraph_config(), studio: self.studio_config(), plugin_config_path, + telemetry: self.telemetry_config(), + } + } + pub fn telemetry_config(&self) -> TelemetryConfig { + TelemetryConfig { + enabled: self.metrics_enabled, + prometheus_addr: self.metrics_addr.clone(), } } } diff --git a/crates/cli/src/cli/simnet/mod.rs b/crates/cli/src/cli/simnet/mod.rs index e0c69848..36a46f6f 100644 --- a/crates/cli/src/cli/simnet/mod.rs +++ b/crates/cli/src/cli/simnet/mod.rs @@ -63,6 +63,20 @@ pub async fn handle_start_local_surfnet_command( let (mut surfnet_svm, simnet_events_rx, geyser_events_rx) = SurfnetSvm::new_with_db(cmd.db.as_deref(), &cmd.surfnet_id) .map_err(|e| format!("Failed to initialize Surfnet SVM: {}", e))?; + let telemetry_config = cmd.telemetry_config(); + if let Err(e) = surfpool_core::telemetry::init_from_config( + telemetry_config.enabled, + &telemetry_config.prometheus_addr, + ) { + let _ = surfnet_svm + .simnet_events_tx + .send(SimnetEvent::warn(format!("Metrics init failed: {}", e))); + } else if telemetry_config.enabled { + let _ = surfnet_svm.simnet_events_tx.send(SimnetEvent::info(format!( + "Metrics available at http://{}/metrics", + telemetry_config.prometheus_addr + ))); + } // Apply feature configuration from CLI flags let feature_config = cmd.feature_config(); @@ -443,6 +457,23 @@ fn log_events( let _ = simnet_commands_tx .send(SimnetCommand::CompleteRunbookExecution(runbook_id, errors)); } + SimnetEvent::MetricsData(metrics_data) => { + #[cfg(feature = "prometheus")] + { + surfpool_core::telemetry::metrics().record_svm_state( + metrics_data.slot, + metrics_data.epoch, + metrics_data.slot_index, + metrics_data.transactions_count, + metrics_data.transactions_processed, + metrics_data.start_time, + metrics_data.signature_subs, + metrics_data.account_subs, + metrics_data.slot_subs, + metrics_data.logs_subs, + ); + } + } }, Err(_e) => { break; diff --git a/crates/cli/src/tui/simnet.rs b/crates/cli/src/tui/simnet.rs index ee236f26..ca15ddaa 100644 --- a/crates/cli/src/tui/simnet.rs +++ b/crates/cli/src/tui/simnet.rs @@ -596,6 +596,21 @@ fn run_app(terminal: &mut Terminal, mut app: App) -> io::Result<( ); app.status_bar_message = None; } + SimnetEvent::MetricsData(metrics_data) => { + #[cfg(feature = "prometheus")] + { + // In TUI mode, we don't need to record metrics to Prometheus + // because the metrics server is running in its own thread + // But we might want to display something in debug mode + if app.include_debug_logs { + new_events.push(( + EventType::Debug, + Local::now(), + format!("Metrics updated: slot={}", metrics_data.slot), + )); + } + } + } }, Err(_) => break, }, diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index b9529a4c..956e3216 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -100,7 +100,12 @@ anchor-lang-idl = { workspace = true } txtx-addon-kit = { workspace = true } txtx-addon-network-svm-types = { workspace = true } txtx-addon-network-svm = { workspace = true } - +# Prometheus metrics - declare normally, control via feature +opentelemetry = { version = "0.28", default-features = false, features = ["metrics"], optional = true } +opentelemetry_sdk = { version = "0.28", default-features = false, features = ["rt-tokio", "metrics"], optional = true } +opentelemetry-prometheus = { version = "0.28", default-features = false, optional = true } +prometheus = { version = "0.13", default-features = false, optional = true } +axum = { version = "0.8", default-features = false, features = ["tokio", "http1"], optional = true } [dev-dependencies] test-case = { workspace = true } @@ -116,3 +121,4 @@ ignore_tests_ci = [] geyser_plugin = [] # Disabled: solana-geyser-plugin-manager conflicts with litesvm 0.9.1 subgraph = ["surfpool-subgraph"] register-tracing = ["litesvm/register-tracing"] +prometheus = ["dep:opentelemetry", "dep:opentelemetry_sdk", "dep:opentelemetry-prometheus", "dep:prometheus", "dep:axum"] \ No newline at end of file diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 9f1d3ebd..147cb00c 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -17,6 +17,7 @@ pub mod runloops; pub mod scenarios; pub mod storage; pub mod surfnet; +pub mod telemetry; pub mod types; use crossbeam_channel::{Receiver, Sender}; diff --git a/crates/core/src/rpc/admin.rs b/crates/core/src/rpc/admin.rs index 16661edf..bbd1c7f9 100644 --- a/crates/core/src/rpc/admin.rs +++ b/crates/core/src/rpc/admin.rs @@ -3,7 +3,7 @@ use std::time::Duration; use jsonrpc_core::{BoxFuture, Result}; use jsonrpc_derive::rpc; use solana_client::rpc_custom_error::RpcCustomError; -use surfpool_types::{SimnetCommand, SimnetEvent}; +use surfpool_types::{SimnetCommand, SimnetEvent, SurfpoolStatus, WsSubscriptions}; use txtx_addon_network_svm_types::subgraph::PluginConfig; use uuid::Uuid; @@ -192,6 +192,9 @@ pub trait AdminRpc { /// - This method is useful for monitoring system uptime and verifying system health. #[rpc(meta, name = "startTime")] fn start_time(&self, meta: Self::Metadata) -> Result; + + #[rpc(meta, name = "surfpoolStatus")] + fn surfpool_status(&self, meta: Self::Metadata) -> Result; } pub struct SurfpoolAdminRpc; @@ -363,4 +366,56 @@ impl AdminRpc for SurfpoolAdminRpc { let datetime_utc: chrono::DateTime = system_time.into(); Ok(datetime_utc.to_rfc3339()) } + fn surfpool_status(&self, meta: Self::Metadata) -> Result { + // Ensure we have RunloopContext metadata + let Some(ctx) = meta else { + return Err(RpcCustomError::NodeUnhealthy { + num_slots_behind: None, + } + .into()); + }; + + // Read a snapshot of SVM state under a reader lock + // Read a consistent snapshot of SVM state + let status = ctx.svm_locker.with_svm_reader(|svm| { + // Epoch / slot info + let slot = svm.latest_epoch_info.absolute_slot; + let epoch = svm.latest_epoch_info.epoch; + let slot_index = svm.latest_epoch_info.slot_index; + + // transactions_count via Storage::count(); fall back to 0 on error + let transactions_count = svm.transactions.count().unwrap_or(0); + + // monotonic processed counter + let transactions_processed = svm.transactions_processed; + + // subscription counts (in-memory collections) + let signature_subscriptions = svm.signature_subscriptions.len(); + let account_subscriptions = svm.account_subscriptions.len(); + let slot_subscriptions = svm.slot_subscriptions.len(); + let logs_subscriptions = svm.logs_subscriptions.len(); + + // uptime in ms + let uptime_ms = match std::time::SystemTime::now().duration_since(svm.start_time) { + Ok(d) => d.as_millis() as u64, + Err(_) => 0, + }; + + SurfpoolStatus { + slot, + epoch, + slot_index, + transactions_count, + transactions_processed, + uptime_ms, + ws_subscriptions: WsSubscriptions { + signatures: signature_subscriptions, + accounts: account_subscriptions, + slots: slot_subscriptions, + logs: logs_subscriptions, + }, + } + }); + Ok(status) + } } diff --git a/crates/core/src/surfnet/locker.rs b/crates/core/src/surfnet/locker.rs index 18fe855e..254d4eb9 100644 --- a/crates/core/src/surfnet/locker.rs +++ b/crates/core/src/surfnet/locker.rs @@ -53,9 +53,9 @@ use solana_transaction_status::{ }; use surfpool_types::{ AccountSnapshot, ComputeUnitsEstimationResult, ExecutionCapture, ExportSnapshotConfig, Idl, - KeyedProfileResult, ProfileResult, RpcProfileResultConfig, RunbookExecutionStatusReport, - SimnetCommand, SimnetEvent, TransactionConfirmationStatus, TransactionStatusEvent, - UiKeyedProfileResult, UuidOrSignature, VersionedIdl, + KeyedProfileResult, MetricsData, ProfileResult, RpcProfileResultConfig, + RunbookExecutionStatusReport, SimnetCommand, SimnetEvent, TransactionConfirmationStatus, + TransactionStatusEvent, UiKeyedProfileResult, UuidOrSignature, VersionedIdl, }; use tokio::sync::RwLock; use txtx_addon_kit::indexmap::IndexSet; @@ -1043,6 +1043,23 @@ impl SurfnetSvmLocker { self.with_svm_writer(|svm_writer| { svm_writer.write_executed_profile_result(signature, profile_result) })?; + + let metric_data = self.with_svm_reader(|svm| MetricsData { + slot: svm.latest_epoch_info.absolute_slot, + epoch: svm.latest_epoch_info.epoch, + slot_index: svm.latest_epoch_info.slot_index, + transactions_count: svm.transactions.count().unwrap_or(0) as usize, + transactions_processed: svm.transactions_processed, + start_time: svm.start_time, + signature_subs: svm.signature_subscriptions.len(), + account_subs: svm.account_subscriptions.len(), + slot_subs: svm.slot_subscriptions.len(), + logs_subs: svm.logs_subscriptions.len(), + }); + let _ = self + .simnet_events_tx() + .send(SimnetEvent::MetricsData(metric_data)); + Ok(()) } diff --git a/crates/core/src/telemetry.rs b/crates/core/src/telemetry.rs new file mode 100644 index 00000000..863d57fd --- /dev/null +++ b/crates/core/src/telemetry.rs @@ -0,0 +1,316 @@ +//! Prometheus metrics for Surfpool +//! +//! Feature `prometheus` enables a `/metrics` HTTP endpoint + +use std::time::SystemTime; + +#[cfg(feature = "prometheus")] +mod instrumented { + use std::sync::{Once, OnceLock}; + + use opentelemetry::{ + KeyValue, + metrics::{Counter, Gauge, Meter, MeterProvider}, + }; + use opentelemetry_sdk::{Resource, metrics::SdkMeterProvider}; + use prometheus::Encoder; + + pub use super::*; + + static INIT: Once = Once::new(); + static METRICS: OnceLock = OnceLock::new(); + static METER_PROVIDER: OnceLock = OnceLock::new(); + + pub struct SurfpoolMetrics { + slot: Gauge, + epoch: Gauge, + slot_index: Gauge, + transactions_count: Gauge, + transactions_processed_total: Gauge, + uptime_seconds: Gauge, + ws_subscriptions_total: Gauge, + ws_signature_subscriptions: Gauge, + ws_account_subscriptions: Gauge, + ws_slot_subscriptions: Gauge, + ws_logs_subscriptions: Gauge, + transactions_processed: Counter, + } + + impl SurfpoolMetrics { + fn new(meter: Meter) -> Self { + Self { + slot: meter + .u64_gauge("surfpool_slot") + .with_description("Current slot height") + .build(), + epoch: meter + .u64_gauge("surfpool_epoch") + .with_description("Current epoch") + .build(), + slot_index: meter + .u64_gauge("surfpool_slot_index") + .with_description("Slot index within epoch") + .build(), + transactions_count: meter + .u64_gauge("surfpool_transactions_count") + .with_description("Number of transactions in storage") + .build(), + transactions_processed_total: meter + .u64_gauge("surfpool_transactions_processed_total") + .with_description("Total processed transactions") + .build(), + uptime_seconds: meter + .u64_gauge("surfpool_uptime_seconds") + .with_description("Time since start in seconds") + .build(), + ws_subscriptions_total: meter + .u64_gauge("surfpool_ws_subscriptions_total") + .with_description("Total WebSocket subscriptions") + .build(), + ws_signature_subscriptions: meter + .u64_gauge("surfpool_ws_signature_subscriptions") + .with_description("Signature subscriptions count") + .build(), + ws_account_subscriptions: meter + .u64_gauge("surfpool_ws_account_subscriptions") + .with_description("Account subscriptions count") + .build(), + ws_slot_subscriptions: meter + .u64_gauge("surfpool_ws_slot_subscriptions") + .with_description("Slot subscriptions count") + .build(), + ws_logs_subscriptions: meter + .u64_gauge("surfpool_ws_logs_subscriptions") + .with_description("Logs subscriptions count") + .build(), + transactions_processed: meter + .u64_counter("surfpool_transactions_processed") + .with_description("Transactions processed counter") + .build(), + } + } + + pub fn record_svm_state( + &self, + slot: u64, + epoch: u64, + slot_index: u64, + transactions_count: usize, + transactions_processed: u64, + start_time: SystemTime, + signature_subs: usize, + account_subs: usize, + slot_subs: usize, + logs_subs: usize, + ) { + let uptime_secs = SystemTime::now() + .duration_since(start_time) + .unwrap_or_default() + .as_secs(); + + self.slot.record(slot, &[]); + self.epoch.record(epoch, &[]); + self.slot_index.record(slot_index, &[]); + self.transactions_count + .record(transactions_count as u64, &[]); + self.transactions_processed_total + .record(transactions_processed, &[]); + self.uptime_seconds.record(uptime_secs, &[]); + + let total_subs = (signature_subs + account_subs + slot_subs + logs_subs) as u64; + self.ws_subscriptions_total.record(total_subs, &[]); + self.ws_signature_subscriptions + .record(signature_subs as u64, &[]); + self.ws_account_subscriptions + .record(account_subs as u64, &[]); + self.ws_slot_subscriptions.record(slot_subs as u64, &[]); + self.ws_logs_subscriptions.record(logs_subs as u64, &[]); + } + + pub fn increment_transactions_processed(&self, count: u64) { + self.transactions_processed.add(count, &[]); + } + } + + pub fn init_prometheus( + service_name: &str, + bind_addr: &str, + ) -> Result<(), Box> { + let service_name_owned = service_name.to_string(); + let bind_addr_owned = bind_addr.to_string(); + + let mut result = Ok(()); + + INIT.call_once(|| { + // Create prometheus registry + let registry = prometheus::Registry::new(); + + // Create prometheus exporter using the new 0.28 API + // In 0.28, opentelemetry-prometheus uses a different approach + let exporter = match opentelemetry_prometheus::exporter() + .with_registry(registry.clone()) + .build() + { + Ok(exp) => exp, + Err(e) => { + result = Err(Box::new(e) as Box); + return; + } + }; + + // Build resource using 0.28 API + let resource = Resource::builder() + .with_attributes(vec![KeyValue::new("service.name", service_name_owned)]) + .build(); + + // Build meter provider using 0.28 API + let provider = SdkMeterProvider::builder() + .with_resource(resource) + .with_reader(exporter) + .build(); + + let meter = provider.meter("surfpool-core"); + let metrics = SurfpoolMetrics::new(meter); + + if METER_PROVIDER.set(provider).is_err() { + result = Err("Meter provider already initialized".into()); + return; + } + + if METRICS.set(metrics).is_err() { + result = Err("Metrics already initialized".into()); + return; + } + + // Spawn HTTP server in a blocking thread + std::thread::spawn(move || { + let rt = match tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + { + Ok(rt) => rt, + Err(e) => { + eprintln!("Failed to create tokio runtime: {}", e); + return; + } + }; + + rt.block_on(async { + let registry_clone = registry.clone(); + + // Build axum 0.8 router with new path syntax + let app = axum::Router::new().route( + "/metrics", + axum::routing::get(move || { + let reg = registry_clone.clone(); + async move { + let encoder = prometheus::TextEncoder::new(); + let metric_families = reg.gather(); + + let mut buffer = vec![]; + if let Err(e) = encoder.encode(&metric_families, &mut buffer) { + return ( + axum::http::StatusCode::INTERNAL_SERVER_ERROR, + format!("Failed to encode: {}", e), + ); + } + + let body = String::from_utf8(buffer) + .unwrap_or_else(|_| "Invalid UTF8".to_string()); + + (axum::http::StatusCode::OK, body) + } + }), + ); + + let listener = match tokio::net::TcpListener::bind(&bind_addr_owned).await { + Ok(l) => l, + Err(e) => { + eprintln!("Failed to bind: {}", e); + return; + } + }; + if let Err(e) = axum::serve(listener, app).await { + eprintln!("Server error: {}", e); + } + }); + }); + }); + + result + } + + pub fn metrics() -> &'static SurfpoolMetrics { + METRICS + .get() + .expect("telemetry not initialized. Call init_prometheus() first") + } + + pub fn shutdown() { + if let Some(provider) = METER_PROVIDER.get() { + let _ = provider.shutdown(); + } + } +} + +#[cfg(not(feature = "prometheus"))] +mod instrumented { + use super::*; + + pub struct SurfpoolMetrics; + + impl SurfpoolMetrics { + pub fn record_svm_state( + &self, + _slot: u64, + _epoch: u64, + _slot_index: u64, + _transactions_count: usize, + _transactions_processed: u64, + _start_time: SystemTime, + _signature_subs: usize, + _account_subs: usize, + _slot_subs: usize, + _logs_subs: usize, + ) { + } + pub fn increment_transactions_processed(&self, _count: u64) {} + } + + pub fn init_prometheus( + _service_name: &str, + _bind_addr: &str, + ) -> Result<(), Box> { + Ok(()) + } + + pub fn metrics() -> SurfpoolMetrics { + SurfpoolMetrics + } + + pub fn shutdown() {} +} + +pub use instrumented::*; + +pub fn init_from_config(enabled: bool, bind_addr: &str) -> Result<(), String> { + #[cfg(feature = "prometheus")] + { + if !enabled { + log::info!("Prometheus metrics disabled"); + return Ok(()); + } + log::info!("Starting Prometheus metrics on {}", bind_addr); + init_prometheus("surfpool", bind_addr) + .map_err(|e| format!("Prometheus init failed: {}", e))?; + } + #[cfg(not(feature = "prometheus"))] + { + if enabled { + log::warn!( + "Prometheus enabled but feature not compiled in. Rebuild with --features prometheus" + ); + } + } + Ok(()) +} diff --git a/crates/types/src/types.rs b/crates/types/src/types.rs index 7cdc3f8a..6cc75ab6 100644 --- a/crates/types/src/types.rs +++ b/crates/types/src/types.rs @@ -4,6 +4,7 @@ use std::{ fmt, path::PathBuf, str::FromStr, + time::SystemTime, }; use blake3::Hash; @@ -417,6 +418,7 @@ pub enum SubgraphCommand { #[derive(Debug)] pub enum SimnetEvent { /// Surfnet is ready, with the initial count of processed transactions from storage + MetricsData(MetricsData), Ready(u64), Connected(String), Aborted(String), @@ -593,16 +595,18 @@ pub struct SanitizedConfig { pub workspace: Option, } -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug, Default, Serialize, Deserialize)] pub struct SurfpoolConfig { pub simnets: Vec, pub rpc: RpcConfig, pub subgraph: SubgraphConfig, pub studio: StudioConfig, pub plugin_config_path: Vec, + #[serde(default)] + pub telemetry: TelemetryConfig, } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct SimnetConfig { pub offline_mode: bool, pub remote_rpc_url: Option, @@ -662,14 +666,14 @@ impl SimnetConfig { } } -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug, Default, Serialize, Deserialize)] pub struct SubgraphConfig {} pub const DEFAULT_GOSSIP_PORT: u16 = 8001; pub const DEFAULT_TPU_PORT: u16 = 8003; pub const DEFAULT_TPU_QUIC_PORT: u16 = 8004; -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct RpcConfig { pub bind_host: String, pub bind_port: u16, @@ -701,7 +705,7 @@ impl Default for RpcConfig { } } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct StudioConfig { pub bind_host: String, pub bind_port: u16, @@ -1256,6 +1260,51 @@ impl RunbookExecutionStatusReport { self.errors = error; } } +/// WebSocket subscription counts +#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default)] +pub struct WsSubscriptions { + pub signatures: usize, + pub accounts: usize, + pub slots: usize, + pub logs: usize, +} + +/// Surfpool node status information +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SurfpoolStatus { + pub slot: u64, + pub epoch: u64, + pub slot_index: u64, + pub transactions_count: u64, + pub transactions_processed: u64, + pub uptime_ms: u64, + pub ws_subscriptions: WsSubscriptions, +} + +#[derive(Clone, Debug)] +pub struct MetricsData { + pub slot: u64, + pub epoch: u64, + pub slot_index: u64, + pub transactions_count: usize, + pub transactions_processed: u64, + pub start_time: SystemTime, + pub signature_subs: usize, + pub account_subs: usize, + pub slot_subs: usize, + pub logs_subs: usize, +} + +#[derive(Clone, Debug, Default, Serialize, Deserialize)] +pub struct TelemetryConfig { + pub enabled: bool, + #[serde(default = "default_prometheus_addr")] + pub prometheus_addr: String, // Just String, not Option +} + +fn default_prometheus_addr() -> String { + "0.0.0.0:9000".to_string() +} #[cfg(test)] mod tests { From 9a3667c2c395321174a9d6de15b6ce931ddbb55d Mon Sep 17 00:00:00 2001 From: Danish Ansari Date: Fri, 6 Mar 2026 10:40:14 +0530 Subject: [PATCH 2/3] fix(telemetry): remove no-op stubs and fully cfg-gate MetricsData, TelemetryConfig, CLI flags, and telemetry mod behind prometheus feature --- crates/cli/src/cli/mod.rs | 11 +- crates/cli/src/cli/simnet/mod.rs | 36 +++--- crates/cli/src/tui/simnet.rs | 2 +- crates/core/src/surfnet/locker.rs | 37 +++--- crates/core/src/telemetry.rs | 190 ++++++------------------------ crates/types/src/types.rs | 7 +- 6 files changed, 96 insertions(+), 187 deletions(-) diff --git a/crates/cli/src/cli/mod.rs b/crates/cli/src/cli/mod.rs index ceadcf48..2c2037d9 100644 --- a/crates/cli/src/cli/mod.rs +++ b/crates/cli/src/cli/mod.rs @@ -25,12 +25,15 @@ use surfpool_types::{ DEFAULT_DEVNET_RPC_URL, DEFAULT_GOSSIP_PORT, DEFAULT_MAINNET_RPC_URL, DEFAULT_NETWORK_HOST, DEFAULT_RPC_PORT, DEFAULT_SLOT_TIME_MS, DEFAULT_TESTNET_RPC_URL, DEFAULT_TPU_PORT, DEFAULT_TPU_QUIC_PORT, DEFAULT_WS_PORT, RpcConfig, SimnetConfig, SimnetEvent, StudioConfig, - SubgraphConfig, SurfpoolConfig, SvmFeature, SvmFeatureConfig, TelemetryConfig, + SubgraphConfig, SurfpoolConfig, SvmFeature, SvmFeatureConfig, }; use txtx_cloud::LoginCommand; use txtx_core::manifest::WorkspaceManifest; use txtx_gql::kit::{helpers::fs::FileLocation, types::frontend::LogLevel}; +#[cfg(feature = "prometheus")] +use surfpool_types::TelemetryConfig; + use crate::{cloud::CloudStartCommand, runbook::handle_execute_runbook_command}; mod simnet; @@ -261,8 +264,12 @@ pub struct StartSimnet { #[arg(long = "snapshot")] pub snapshot: Vec, /// Enable Prometheus metrics endpoint + #[cfg(feature = "prometheus")] + /// Enable Prometheus metrics endpoint #[arg(long = "metrics-enabled", env = "SURFPOOL_METRICS_ENABLED")] pub metrics_enabled: bool, + + #[cfg(feature = "prometheus")] /// Prometheus metrics endpoint address #[arg( long = "metrics-addr", @@ -469,9 +476,11 @@ impl StartSimnet { subgraph: self.subgraph_config(), studio: self.studio_config(), plugin_config_path, + #[cfg(feature = "prometheus")] telemetry: self.telemetry_config(), } } + #[cfg(feature = "prometheus")] pub fn telemetry_config(&self) -> TelemetryConfig { TelemetryConfig { enabled: self.metrics_enabled, diff --git a/crates/cli/src/cli/simnet/mod.rs b/crates/cli/src/cli/simnet/mod.rs index 36a46f6f..321bc99c 100644 --- a/crates/cli/src/cli/simnet/mod.rs +++ b/crates/cli/src/cli/simnet/mod.rs @@ -63,19 +63,27 @@ pub async fn handle_start_local_surfnet_command( let (mut surfnet_svm, simnet_events_rx, geyser_events_rx) = SurfnetSvm::new_with_db(cmd.db.as_deref(), &cmd.surfnet_id) .map_err(|e| format!("Failed to initialize Surfnet SVM: {}", e))?; - let telemetry_config = cmd.telemetry_config(); - if let Err(e) = surfpool_core::telemetry::init_from_config( - telemetry_config.enabled, - &telemetry_config.prometheus_addr, - ) { - let _ = surfnet_svm - .simnet_events_tx - .send(SimnetEvent::warn(format!("Metrics init failed: {}", e))); - } else if telemetry_config.enabled { - let _ = surfnet_svm.simnet_events_tx.send(SimnetEvent::info(format!( - "Metrics available at http://{}/metrics", - telemetry_config.prometheus_addr - ))); + #[cfg(feature = "prometheus")] + { + let telemetry_config = cmd.telemetry_config(); + if telemetry_config.enabled { + match surfpool_core::telemetry::init_from_config( + telemetry_config.enabled, + &telemetry_config.prometheus_addr, + ) { + Err(e) => { + let _ = surfnet_svm + .simnet_events_tx + .send(SimnetEvent::warn(format!("Metrics init failed: {}", e))); + } + Ok(_) => { + let _ = surfnet_svm.simnet_events_tx.send(SimnetEvent::info(format!( + "Metrics available at http://{}/metrics", + telemetry_config.prometheus_addr + ))); + } + } + } } // Apply feature configuration from CLI flags @@ -457,8 +465,8 @@ fn log_events( let _ = simnet_commands_tx .send(SimnetCommand::CompleteRunbookExecution(runbook_id, errors)); } + #[cfg(feature = "prometheus")] SimnetEvent::MetricsData(metrics_data) => { - #[cfg(feature = "prometheus")] { surfpool_core::telemetry::metrics().record_svm_state( metrics_data.slot, diff --git a/crates/cli/src/tui/simnet.rs b/crates/cli/src/tui/simnet.rs index ca15ddaa..d357275c 100644 --- a/crates/cli/src/tui/simnet.rs +++ b/crates/cli/src/tui/simnet.rs @@ -596,8 +596,8 @@ fn run_app(terminal: &mut Terminal, mut app: App) -> io::Result<( ); app.status_bar_message = None; } + #[cfg(feature = "prometheus")] SimnetEvent::MetricsData(metrics_data) => { - #[cfg(feature = "prometheus")] { // In TUI mode, we don't need to record metrics to Prometheus // because the metrics server is running in its own thread diff --git a/crates/core/src/surfnet/locker.rs b/crates/core/src/surfnet/locker.rs index 254d4eb9..2ed24c2c 100644 --- a/crates/core/src/surfnet/locker.rs +++ b/crates/core/src/surfnet/locker.rs @@ -53,7 +53,7 @@ use solana_transaction_status::{ }; use surfpool_types::{ AccountSnapshot, ComputeUnitsEstimationResult, ExecutionCapture, ExportSnapshotConfig, Idl, - KeyedProfileResult, MetricsData, ProfileResult, RpcProfileResultConfig, + KeyedProfileResult, ProfileResult, RpcProfileResultConfig, RunbookExecutionStatusReport, SimnetCommand, SimnetEvent, TransactionConfirmationStatus, TransactionStatusEvent, UiKeyedProfileResult, UuidOrSignature, VersionedIdl, }; @@ -75,6 +75,8 @@ use crate::{ TokenAccount, TransactionLoadedAddresses, TransactionWithStatusMeta, }, }; +#[cfg(feature = "prometheus")] +use surfpool_types::MetricsData; enum ProcessTransactionResult { Success(TransactionMetadata), @@ -1044,21 +1046,24 @@ impl SurfnetSvmLocker { svm_writer.write_executed_profile_result(signature, profile_result) })?; - let metric_data = self.with_svm_reader(|svm| MetricsData { - slot: svm.latest_epoch_info.absolute_slot, - epoch: svm.latest_epoch_info.epoch, - slot_index: svm.latest_epoch_info.slot_index, - transactions_count: svm.transactions.count().unwrap_or(0) as usize, - transactions_processed: svm.transactions_processed, - start_time: svm.start_time, - signature_subs: svm.signature_subscriptions.len(), - account_subs: svm.account_subscriptions.len(), - slot_subs: svm.slot_subscriptions.len(), - logs_subs: svm.logs_subscriptions.len(), - }); - let _ = self - .simnet_events_tx() - .send(SimnetEvent::MetricsData(metric_data)); + #[cfg(feature = "prometheus")] + { + let metric_data = self.with_svm_reader(|svm| MetricsData { + slot: svm.latest_epoch_info.absolute_slot, + epoch: svm.latest_epoch_info.epoch, + slot_index: svm.latest_epoch_info.slot_index, + transactions_count: svm.transactions.count().unwrap_or(0) as usize, + transactions_processed: svm.transactions_processed, + start_time: svm.start_time, + signature_subs: svm.signature_subscriptions.len(), + account_subs: svm.account_subscriptions.len(), + slot_subs: svm.slot_subscriptions.len(), + logs_subs: svm.logs_subscriptions.len(), + }); + let _ = self + .simnet_events_tx() + .send(SimnetEvent::MetricsData(metric_data)); + } Ok(()) } diff --git a/crates/core/src/telemetry.rs b/crates/core/src/telemetry.rs index 863d57fd..9fc630da 100644 --- a/crates/core/src/telemetry.rs +++ b/crates/core/src/telemetry.rs @@ -2,6 +2,7 @@ //! //! Feature `prometheus` enables a `/metrics` HTTP endpoint +#[cfg(feature = "prometheus")] use std::time::SystemTime; #[cfg(feature = "prometheus")] @@ -39,54 +40,18 @@ mod instrumented { impl SurfpoolMetrics { fn new(meter: Meter) -> Self { Self { - slot: meter - .u64_gauge("surfpool_slot") - .with_description("Current slot height") - .build(), - epoch: meter - .u64_gauge("surfpool_epoch") - .with_description("Current epoch") - .build(), - slot_index: meter - .u64_gauge("surfpool_slot_index") - .with_description("Slot index within epoch") - .build(), - transactions_count: meter - .u64_gauge("surfpool_transactions_count") - .with_description("Number of transactions in storage") - .build(), - transactions_processed_total: meter - .u64_gauge("surfpool_transactions_processed_total") - .with_description("Total processed transactions") - .build(), - uptime_seconds: meter - .u64_gauge("surfpool_uptime_seconds") - .with_description("Time since start in seconds") - .build(), - ws_subscriptions_total: meter - .u64_gauge("surfpool_ws_subscriptions_total") - .with_description("Total WebSocket subscriptions") - .build(), - ws_signature_subscriptions: meter - .u64_gauge("surfpool_ws_signature_subscriptions") - .with_description("Signature subscriptions count") - .build(), - ws_account_subscriptions: meter - .u64_gauge("surfpool_ws_account_subscriptions") - .with_description("Account subscriptions count") - .build(), - ws_slot_subscriptions: meter - .u64_gauge("surfpool_ws_slot_subscriptions") - .with_description("Slot subscriptions count") - .build(), - ws_logs_subscriptions: meter - .u64_gauge("surfpool_ws_logs_subscriptions") - .with_description("Logs subscriptions count") - .build(), - transactions_processed: meter - .u64_counter("surfpool_transactions_processed") - .with_description("Transactions processed counter") - .build(), + slot: meter.u64_gauge("surfpool_slot").with_description("Current slot height").build(), + epoch: meter.u64_gauge("surfpool_epoch").with_description("Current epoch").build(), + slot_index: meter.u64_gauge("surfpool_slot_index").with_description("Slot index within epoch").build(), + transactions_count: meter.u64_gauge("surfpool_transactions_count").with_description("Number of transactions in storage").build(), + transactions_processed_total: meter.u64_gauge("surfpool_transactions_processed_total").with_description("Total processed transactions").build(), + uptime_seconds: meter.u64_gauge("surfpool_uptime_seconds").with_description("Time since start in seconds").build(), + ws_subscriptions_total: meter.u64_gauge("surfpool_ws_subscriptions_total").with_description("Total WebSocket subscriptions").build(), + ws_signature_subscriptions: meter.u64_gauge("surfpool_ws_signature_subscriptions").with_description("Signature subscriptions count").build(), + ws_account_subscriptions: meter.u64_gauge("surfpool_ws_account_subscriptions").with_description("Account subscriptions count").build(), + ws_slot_subscriptions: meter.u64_gauge("surfpool_ws_slot_subscriptions").with_description("Slot subscriptions count").build(), + ws_logs_subscriptions: meter.u64_gauge("surfpool_ws_logs_subscriptions").with_description("Logs subscriptions count").build(), + transactions_processed: meter.u64_counter("surfpool_transactions_processed").with_description("Transactions processed counter").build(), } } @@ -111,18 +76,14 @@ mod instrumented { self.slot.record(slot, &[]); self.epoch.record(epoch, &[]); self.slot_index.record(slot_index, &[]); - self.transactions_count - .record(transactions_count as u64, &[]); - self.transactions_processed_total - .record(transactions_processed, &[]); + self.transactions_count.record(transactions_count as u64, &[]); + self.transactions_processed_total.record(transactions_processed, &[]); self.uptime_seconds.record(uptime_secs, &[]); let total_subs = (signature_subs + account_subs + slot_subs + logs_subs) as u64; self.ws_subscriptions_total.record(total_subs, &[]); - self.ws_signature_subscriptions - .record(signature_subs as u64, &[]); - self.ws_account_subscriptions - .record(account_subs as u64, &[]); + self.ws_signature_subscriptions.record(signature_subs as u64, &[]); + self.ws_account_subscriptions.record(account_subs as u64, &[]); self.ws_slot_subscriptions.record(slot_subs as u64, &[]); self.ws_logs_subscriptions.record(logs_subs as u64, &[]); } @@ -138,15 +99,10 @@ mod instrumented { ) -> Result<(), Box> { let service_name_owned = service_name.to_string(); let bind_addr_owned = bind_addr.to_string(); - let mut result = Ok(()); INIT.call_once(|| { - // Create prometheus registry let registry = prometheus::Registry::new(); - - // Create prometheus exporter using the new 0.28 API - // In 0.28, opentelemetry-prometheus uses a different approach let exporter = match opentelemetry_prometheus::exporter() .with_registry(registry.clone()) .build() @@ -158,12 +114,10 @@ mod instrumented { } }; - // Build resource using 0.28 API let resource = Resource::builder() .with_attributes(vec![KeyValue::new("service.name", service_name_owned)]) .build(); - // Build meter provider using 0.28 API let provider = SdkMeterProvider::builder() .with_resource(resource) .with_reader(exporter) @@ -172,33 +126,22 @@ mod instrumented { let meter = provider.meter("surfpool-core"); let metrics = SurfpoolMetrics::new(meter); - if METER_PROVIDER.set(provider).is_err() { - result = Err("Meter provider already initialized".into()); + if let Err(e) = METER_PROVIDER.set(provider) { + result = Err(format!("Meter provider already initialized: {:?}", e).into()); return; } - - if METRICS.set(metrics).is_err() { - result = Err("Metrics already initialized".into()); + if let Err(e) = METRICS.set(metrics) { + result = Err(format!("Metrics already initialized: {:?}", e).into()); return; } - // Spawn HTTP server in a blocking thread std::thread::spawn(move || { - let rt = match tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build() - { + let rt = match tokio::runtime::Builder::new_multi_thread().enable_all().build() { Ok(rt) => rt, - Err(e) => { - eprintln!("Failed to create tokio runtime: {}", e); - return; - } + Err(e) => { eprintln!("Failed to create tokio runtime: {}", e); return; } }; - rt.block_on(async { let registry_clone = registry.clone(); - - // Build axum 0.8 router with new path syntax let app = axum::Router::new().route( "/metrics", axum::routing::get(move || { @@ -206,29 +149,18 @@ mod instrumented { async move { let encoder = prometheus::TextEncoder::new(); let metric_families = reg.gather(); - let mut buffer = vec![]; if let Err(e) = encoder.encode(&metric_families, &mut buffer) { - return ( - axum::http::StatusCode::INTERNAL_SERVER_ERROR, - format!("Failed to encode: {}", e), - ); + return (axum::http::StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to encode: {}", e)); } - - let body = String::from_utf8(buffer) - .unwrap_or_else(|_| "Invalid UTF8".to_string()); - + let body = String::from_utf8(buffer).unwrap_or_else(|_| "Invalid UTF8".to_string()); (axum::http::StatusCode::OK, body) } }), ); - let listener = match tokio::net::TcpListener::bind(&bind_addr_owned).await { Ok(l) => l, - Err(e) => { - eprintln!("Failed to bind: {}", e); - return; - } + Err(e) => { eprintln!("Failed to bind: {}", e); return; } }; if let Err(e) = axum::serve(listener, app).await { eprintln!("Server error: {}", e); @@ -241,9 +173,7 @@ mod instrumented { } pub fn metrics() -> &'static SurfpoolMetrics { - METRICS - .get() - .expect("telemetry not initialized. Call init_prometheus() first") + METRICS.get().expect("telemetry not initialized. Call init_prometheus() first") } pub fn shutdown() { @@ -253,64 +183,16 @@ mod instrumented { } } -#[cfg(not(feature = "prometheus"))] -mod instrumented { - use super::*; - - pub struct SurfpoolMetrics; - - impl SurfpoolMetrics { - pub fn record_svm_state( - &self, - _slot: u64, - _epoch: u64, - _slot_index: u64, - _transactions_count: usize, - _transactions_processed: u64, - _start_time: SystemTime, - _signature_subs: usize, - _account_subs: usize, - _slot_subs: usize, - _logs_subs: usize, - ) { - } - pub fn increment_transactions_processed(&self, _count: u64) {} - } - - pub fn init_prometheus( - _service_name: &str, - _bind_addr: &str, - ) -> Result<(), Box> { - Ok(()) - } - - pub fn metrics() -> SurfpoolMetrics { - SurfpoolMetrics - } - - pub fn shutdown() {} -} - +#[cfg(feature = "prometheus")] pub use instrumented::*; +#[cfg(feature = "prometheus")] pub fn init_from_config(enabled: bool, bind_addr: &str) -> Result<(), String> { - #[cfg(feature = "prometheus")] - { - if !enabled { - log::info!("Prometheus metrics disabled"); - return Ok(()); - } - log::info!("Starting Prometheus metrics on {}", bind_addr); - init_prometheus("surfpool", bind_addr) - .map_err(|e| format!("Prometheus init failed: {}", e))?; - } - #[cfg(not(feature = "prometheus"))] - { - if enabled { - log::warn!( - "Prometheus enabled but feature not compiled in. Rebuild with --features prometheus" - ); - } + if !enabled { + log::info!("Prometheus metrics disabled"); + return Ok(()); } - Ok(()) -} + log::info!("Starting Prometheus metrics on {}", bind_addr); + init_prometheus("surfpool", bind_addr) + .map_err(|e| format!("Prometheus init failed: {}", e)) +} \ No newline at end of file diff --git a/crates/types/src/types.rs b/crates/types/src/types.rs index 6cc75ab6..5ba7a874 100644 --- a/crates/types/src/types.rs +++ b/crates/types/src/types.rs @@ -418,6 +418,7 @@ pub enum SubgraphCommand { #[derive(Debug)] pub enum SimnetEvent { /// Surfnet is ready, with the initial count of processed transactions from storage + #[cfg(feature = "prometheus")] MetricsData(MetricsData), Ready(u64), Connected(String), @@ -602,6 +603,7 @@ pub struct SurfpoolConfig { pub subgraph: SubgraphConfig, pub studio: StudioConfig, pub plugin_config_path: Vec, + #[cfg(feature = "prometheus")] #[serde(default)] pub telemetry: TelemetryConfig, } @@ -1281,6 +1283,7 @@ pub struct SurfpoolStatus { pub ws_subscriptions: WsSubscriptions, } +#[cfg(feature = "prometheus")] #[derive(Clone, Debug)] pub struct MetricsData { pub slot: u64, @@ -1295,13 +1298,15 @@ pub struct MetricsData { pub logs_subs: usize, } +#[cfg(feature = "prometheus")] #[derive(Clone, Debug, Default, Serialize, Deserialize)] pub struct TelemetryConfig { pub enabled: bool, #[serde(default = "default_prometheus_addr")] - pub prometheus_addr: String, // Just String, not Option + pub prometheus_addr: String, } +#[cfg(feature = "prometheus")] fn default_prometheus_addr() -> String { "0.0.0.0:9000".to_string() } From 811e8382921766c183c3a55ea03bef49056cfb86 Mon Sep 17 00:00:00 2001 From: Danish Ansari Date: Fri, 6 Mar 2026 10:42:52 +0530 Subject: [PATCH 3/3] fix:format the code --- crates/cli/src/cli/mod.rs | 5 +- crates/cli/src/cli/simnet/mod.rs | 26 ++++---- crates/core/src/surfnet/locker.rs | 10 +-- crates/core/src/telemetry.rs | 104 +++++++++++++++++++++++------- 4 files changed, 98 insertions(+), 47 deletions(-) diff --git a/crates/cli/src/cli/mod.rs b/crates/cli/src/cli/mod.rs index 2c2037d9..cac92df3 100644 --- a/crates/cli/src/cli/mod.rs +++ b/crates/cli/src/cli/mod.rs @@ -20,6 +20,8 @@ use solana_keypair::Keypair; use solana_pubkey::Pubkey; use solana_signer::{EncodableKey, Signer}; use surfpool_mcp::McpOptions; +#[cfg(feature = "prometheus")] +use surfpool_types::TelemetryConfig; use surfpool_types::{ AccountSnapshot, BlockProductionMode, CHANGE_TO_DEFAULT_STUDIO_PORT_ONCE_SUPERVISOR_MERGED, DEFAULT_DEVNET_RPC_URL, DEFAULT_GOSSIP_PORT, DEFAULT_MAINNET_RPC_URL, DEFAULT_NETWORK_HOST, @@ -31,9 +33,6 @@ use txtx_cloud::LoginCommand; use txtx_core::manifest::WorkspaceManifest; use txtx_gql::kit::{helpers::fs::FileLocation, types::frontend::LogLevel}; -#[cfg(feature = "prometheus")] -use surfpool_types::TelemetryConfig; - use crate::{cloud::CloudStartCommand, runbook::handle_execute_runbook_command}; mod simnet; diff --git a/crates/cli/src/cli/simnet/mod.rs b/crates/cli/src/cli/simnet/mod.rs index 321bc99c..4ea19947 100644 --- a/crates/cli/src/cli/simnet/mod.rs +++ b/crates/cli/src/cli/simnet/mod.rs @@ -467,20 +467,18 @@ fn log_events( } #[cfg(feature = "prometheus")] SimnetEvent::MetricsData(metrics_data) => { - { - surfpool_core::telemetry::metrics().record_svm_state( - metrics_data.slot, - metrics_data.epoch, - metrics_data.slot_index, - metrics_data.transactions_count, - metrics_data.transactions_processed, - metrics_data.start_time, - metrics_data.signature_subs, - metrics_data.account_subs, - metrics_data.slot_subs, - metrics_data.logs_subs, - ); - } + surfpool_core::telemetry::metrics().record_svm_state( + metrics_data.slot, + metrics_data.epoch, + metrics_data.slot_index, + metrics_data.transactions_count, + metrics_data.transactions_processed, + metrics_data.start_time, + metrics_data.signature_subs, + metrics_data.account_subs, + metrics_data.slot_subs, + metrics_data.logs_subs, + ); } }, Err(_e) => { diff --git a/crates/core/src/surfnet/locker.rs b/crates/core/src/surfnet/locker.rs index 2ed24c2c..d23183cb 100644 --- a/crates/core/src/surfnet/locker.rs +++ b/crates/core/src/surfnet/locker.rs @@ -51,11 +51,13 @@ use solana_transaction_status::{ TransactionConfirmationStatus as SolanaTransactionConfirmationStatus, UiConfirmedBlock, UiTransactionEncoding, }; +#[cfg(feature = "prometheus")] +use surfpool_types::MetricsData; use surfpool_types::{ AccountSnapshot, ComputeUnitsEstimationResult, ExecutionCapture, ExportSnapshotConfig, Idl, - KeyedProfileResult, ProfileResult, RpcProfileResultConfig, - RunbookExecutionStatusReport, SimnetCommand, SimnetEvent, TransactionConfirmationStatus, - TransactionStatusEvent, UiKeyedProfileResult, UuidOrSignature, VersionedIdl, + KeyedProfileResult, ProfileResult, RpcProfileResultConfig, RunbookExecutionStatusReport, + SimnetCommand, SimnetEvent, TransactionConfirmationStatus, TransactionStatusEvent, + UiKeyedProfileResult, UuidOrSignature, VersionedIdl, }; use tokio::sync::RwLock; use txtx_addon_kit::indexmap::IndexSet; @@ -75,8 +77,6 @@ use crate::{ TokenAccount, TransactionLoadedAddresses, TransactionWithStatusMeta, }, }; -#[cfg(feature = "prometheus")] -use surfpool_types::MetricsData; enum ProcessTransactionResult { Success(TransactionMetadata), diff --git a/crates/core/src/telemetry.rs b/crates/core/src/telemetry.rs index 9fc630da..d169857d 100644 --- a/crates/core/src/telemetry.rs +++ b/crates/core/src/telemetry.rs @@ -40,18 +40,54 @@ mod instrumented { impl SurfpoolMetrics { fn new(meter: Meter) -> Self { Self { - slot: meter.u64_gauge("surfpool_slot").with_description("Current slot height").build(), - epoch: meter.u64_gauge("surfpool_epoch").with_description("Current epoch").build(), - slot_index: meter.u64_gauge("surfpool_slot_index").with_description("Slot index within epoch").build(), - transactions_count: meter.u64_gauge("surfpool_transactions_count").with_description("Number of transactions in storage").build(), - transactions_processed_total: meter.u64_gauge("surfpool_transactions_processed_total").with_description("Total processed transactions").build(), - uptime_seconds: meter.u64_gauge("surfpool_uptime_seconds").with_description("Time since start in seconds").build(), - ws_subscriptions_total: meter.u64_gauge("surfpool_ws_subscriptions_total").with_description("Total WebSocket subscriptions").build(), - ws_signature_subscriptions: meter.u64_gauge("surfpool_ws_signature_subscriptions").with_description("Signature subscriptions count").build(), - ws_account_subscriptions: meter.u64_gauge("surfpool_ws_account_subscriptions").with_description("Account subscriptions count").build(), - ws_slot_subscriptions: meter.u64_gauge("surfpool_ws_slot_subscriptions").with_description("Slot subscriptions count").build(), - ws_logs_subscriptions: meter.u64_gauge("surfpool_ws_logs_subscriptions").with_description("Logs subscriptions count").build(), - transactions_processed: meter.u64_counter("surfpool_transactions_processed").with_description("Transactions processed counter").build(), + slot: meter + .u64_gauge("surfpool_slot") + .with_description("Current slot height") + .build(), + epoch: meter + .u64_gauge("surfpool_epoch") + .with_description("Current epoch") + .build(), + slot_index: meter + .u64_gauge("surfpool_slot_index") + .with_description("Slot index within epoch") + .build(), + transactions_count: meter + .u64_gauge("surfpool_transactions_count") + .with_description("Number of transactions in storage") + .build(), + transactions_processed_total: meter + .u64_gauge("surfpool_transactions_processed_total") + .with_description("Total processed transactions") + .build(), + uptime_seconds: meter + .u64_gauge("surfpool_uptime_seconds") + .with_description("Time since start in seconds") + .build(), + ws_subscriptions_total: meter + .u64_gauge("surfpool_ws_subscriptions_total") + .with_description("Total WebSocket subscriptions") + .build(), + ws_signature_subscriptions: meter + .u64_gauge("surfpool_ws_signature_subscriptions") + .with_description("Signature subscriptions count") + .build(), + ws_account_subscriptions: meter + .u64_gauge("surfpool_ws_account_subscriptions") + .with_description("Account subscriptions count") + .build(), + ws_slot_subscriptions: meter + .u64_gauge("surfpool_ws_slot_subscriptions") + .with_description("Slot subscriptions count") + .build(), + ws_logs_subscriptions: meter + .u64_gauge("surfpool_ws_logs_subscriptions") + .with_description("Logs subscriptions count") + .build(), + transactions_processed: meter + .u64_counter("surfpool_transactions_processed") + .with_description("Transactions processed counter") + .build(), } } @@ -76,14 +112,18 @@ mod instrumented { self.slot.record(slot, &[]); self.epoch.record(epoch, &[]); self.slot_index.record(slot_index, &[]); - self.transactions_count.record(transactions_count as u64, &[]); - self.transactions_processed_total.record(transactions_processed, &[]); + self.transactions_count + .record(transactions_count as u64, &[]); + self.transactions_processed_total + .record(transactions_processed, &[]); self.uptime_seconds.record(uptime_secs, &[]); let total_subs = (signature_subs + account_subs + slot_subs + logs_subs) as u64; self.ws_subscriptions_total.record(total_subs, &[]); - self.ws_signature_subscriptions.record(signature_subs as u64, &[]); - self.ws_account_subscriptions.record(account_subs as u64, &[]); + self.ws_signature_subscriptions + .record(signature_subs as u64, &[]); + self.ws_account_subscriptions + .record(account_subs as u64, &[]); self.ws_slot_subscriptions.record(slot_subs as u64, &[]); self.ws_logs_subscriptions.record(logs_subs as u64, &[]); } @@ -136,9 +176,15 @@ mod instrumented { } std::thread::spawn(move || { - let rt = match tokio::runtime::Builder::new_multi_thread().enable_all().build() { + let rt = match tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + { Ok(rt) => rt, - Err(e) => { eprintln!("Failed to create tokio runtime: {}", e); return; } + Err(e) => { + eprintln!("Failed to create tokio runtime: {}", e); + return; + } }; rt.block_on(async { let registry_clone = registry.clone(); @@ -151,16 +197,23 @@ mod instrumented { let metric_families = reg.gather(); let mut buffer = vec![]; if let Err(e) = encoder.encode(&metric_families, &mut buffer) { - return (axum::http::StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to encode: {}", e)); + return ( + axum::http::StatusCode::INTERNAL_SERVER_ERROR, + format!("Failed to encode: {}", e), + ); } - let body = String::from_utf8(buffer).unwrap_or_else(|_| "Invalid UTF8".to_string()); + let body = String::from_utf8(buffer) + .unwrap_or_else(|_| "Invalid UTF8".to_string()); (axum::http::StatusCode::OK, body) } }), ); let listener = match tokio::net::TcpListener::bind(&bind_addr_owned).await { Ok(l) => l, - Err(e) => { eprintln!("Failed to bind: {}", e); return; } + Err(e) => { + eprintln!("Failed to bind: {}", e); + return; + } }; if let Err(e) = axum::serve(listener, app).await { eprintln!("Server error: {}", e); @@ -173,7 +226,9 @@ mod instrumented { } pub fn metrics() -> &'static SurfpoolMetrics { - METRICS.get().expect("telemetry not initialized. Call init_prometheus() first") + METRICS + .get() + .expect("telemetry not initialized. Call init_prometheus() first") } pub fn shutdown() { @@ -193,6 +248,5 @@ pub fn init_from_config(enabled: bool, bind_addr: &str) -> Result<(), String> { return Ok(()); } log::info!("Starting Prometheus metrics on {}", bind_addr); - init_prometheus("surfpool", bind_addr) - .map_err(|e| format!("Prometheus init failed: {}", e)) -} \ No newline at end of file + init_prometheus("surfpool", bind_addr).map_err(|e| format!("Prometheus init failed: {}", e)) +}