diff --git a/bin/network-monitor/assets/index.js b/bin/network-monitor/assets/index.js index 842a9567c..c76347cff 100644 --- a/bin/network-monitor/assets/index.js +++ b/bin/network-monitor/assets/index.js @@ -643,8 +643,8 @@ function updateDisplay() { ` : ''} ${details.RemoteProverStatus?.test ? (() => { - const t = details.RemoteProverStatus.test; - const ts = details.RemoteProverStatus.test_status; + const t = details.RemoteProverStatus.test.details; + const ts = details.RemoteProverStatus.test.status; return `
Proof Generation Testing (${t.proof_type}): diff --git a/bin/network-monitor/src/commands/start.rs b/bin/network-monitor/src/commands/start.rs index 924df7e46..777175fbb 100644 --- a/bin/network-monitor/src/commands/start.rs +++ b/bin/network-monitor/src/commands/start.rs @@ -4,12 +4,14 @@ use anyhow::Result; use miden_node_utils::logging::OpenTelemetry; +use tokio::sync::watch; use tracing::{debug, info, instrument, warn}; use crate::COMPONENT; use crate::config::MonitorConfig; use crate::frontend::ServerState; use crate::monitor::tasks::Tasks; +use crate::status::ServiceStatus; /// Start the network monitoring service. /// @@ -92,15 +94,31 @@ pub async fn start_monitor(config: MonitorConfig) -> Result<()> { // Initialize HTTP server. debug!(target: COMPONENT, "Initializing HTTP server"); + + // Build the flat services Vec in the order the dashboard expects to render cards. + let mut services: Vec> = vec![rpc_rx]; + if let Some(rx) = faucet_rx { + services.push(rx); + } + services.extend(prover_rxs); + if let Some(rx) = explorer_rx { + services.push(rx); + } + if let Some(rx) = ntx_increment_rx { + services.push(rx); + } + if let Some(rx) = ntx_tracking_rx { + services.push(rx); + } + if let Some(rx) = note_transport_rx { + services.push(rx); + } + if let Some(rx) = validator_rx { + services.push(rx); + } + let server_state = ServerState { - rpc: rpc_rx, - provers: prover_rxs, - faucet: faucet_rx, - ntx_increment: ntx_increment_rx, - ntx_tracking: ntx_tracking_rx, - explorer: explorer_rx, - note_transport: note_transport_rx, - validator: validator_rx, + services, monitor_version: env!("CARGO_PKG_VERSION").to_string(), network_name: config.network_name.clone(), }; diff --git a/bin/network-monitor/src/frontend.rs b/bin/network-monitor/src/frontend.rs index 320a2efe9..dceafb642 100644 --- a/bin/network-monitor/src/frontend.rs +++ b/bin/network-monitor/src/frontend.rs @@ -12,22 +12,19 @@ use tracing::{info, instrument}; use crate::COMPONENT; use crate::config::MonitorConfig; -use crate::status::{NetworkStatus, RemoteProverDetails, ServiceDetails, ServiceStatus, Status}; +use crate::status::{NetworkStatus, ServiceStatus}; // SERVER STATE // ================================================================================================ /// State for the web server containing watch receivers for all services. +/// +/// Each entry in `services` is a `ServiceStatus` channel. The frontend simply snapshots every +/// entry on each `/status` request. Adding a new service is just pushing another receiver into +/// this Vec at startup; no changes to this struct or `get_status` are required. #[derive(Clone)] pub struct ServerState { - pub rpc: watch::Receiver, - pub provers: Vec<(watch::Receiver, watch::Receiver)>, - pub faucet: Option>, - pub ntx_increment: Option>, - pub ntx_tracking: Option>, - pub explorer: Option>, - pub note_transport: Option>, - pub validator: Option>, + pub services: Vec>, pub monitor_version: String, pub network_name: String, } @@ -71,59 +68,20 @@ async fn get_dashboard() -> Html<&'static str> { async fn get_status( axum::extract::State(server_state): axum::extract::State, ) -> axum::response::Json { - let current_time = SystemTime::now() + let services: Vec = + server_state.services.iter().map(|rx| rx.borrow().clone()).collect(); + + let last_updated = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_else(|_| Duration::from_secs(0)) .as_secs(); - let mut services = Vec::new(); - - // Collect RPC status - services.push(server_state.rpc.borrow().clone()); - - // Collect faucet status if available - if let Some(faucet_rx) = &server_state.faucet { - services.push(faucet_rx.borrow().clone()); - } - - // Collect all remote prover statuses, merging status + test into a single entry per prover. - for (prover_status_rx, prover_test_rx) in &server_state.provers { - services.push(merge_prover(&prover_status_rx.borrow(), &prover_test_rx.borrow())); - } - - // Collect explorer status if available - if let Some(explorer_rx) = &server_state.explorer { - services.push(explorer_rx.borrow().clone()); - } - - // Collect counter increment status if enabled - if let Some(ntx_increment_rx) = &server_state.ntx_increment { - services.push(ntx_increment_rx.borrow().clone()); - } - - // Collect counter tracking status if enabled - if let Some(ntx_tracking_rx) = &server_state.ntx_tracking { - services.push(ntx_tracking_rx.borrow().clone()); - } - - // Collect note transport status if available - if let Some(note_transport_rx) = &server_state.note_transport { - services.push(note_transport_rx.borrow().clone()); - } - - // Collect validator status if available - if let Some(validator_rx) = &server_state.validator { - services.push(validator_rx.borrow().clone()); - } - - let network_status = NetworkStatus { + axum::response::Json(NetworkStatus { services, - last_updated: current_time, + last_updated, monitor_version: server_state.monitor_version.clone(), network_name: server_state.network_name.clone(), - }; - - axum::response::Json(network_status) + }) } async fn serve_css() -> Response { @@ -149,50 +107,3 @@ async fn serve_favicon() -> Response { ) .into_response() } - -/// Merges the status and test receivers for a single remote prover into one `ServiceStatus`. -/// -/// The combined status is `Unhealthy` if either the status check or the test failed, `Unknown` -/// if the status checker has not yet seen the prover, and `Healthy` otherwise. The test result -/// is only attached when the test task has produced an actual `RemoteProverTest` result (before -/// the first test completes, the test channel holds the initial prover status and should not be -/// surfaced as a test). -fn merge_prover(status: &ServiceStatus, test: &ServiceStatus) -> ServiceStatus { - // Extract prover status details, or pass through the raw status if the prover is down - // (details will be `ServiceDetails::Error` in that case). - let status_details = match &status.details { - ServiceDetails::ProverStatusCheck(d) => d.clone(), - _ => return status.clone(), - }; - - // Only attach test details once the test task has produced a real result. - let (test_details, test_status, test_error) = match &test.details { - ServiceDetails::ProverTestResult(d) => { - (Some(d.clone()), Some(test.status.clone()), test.error.clone()) - }, - _ => (None, None, None), - }; - - let details = ServiceDetails::RemoteProverStatus(RemoteProverDetails { - status: status_details, - test: test_details, - test_status: test_status.clone(), - test_error: test_error.clone(), - }); - - let name = &status.name; - let base = match (&status.status, &test_status) { - (Status::Unhealthy, _) | (_, Some(Status::Unhealthy)) => { - let error = status - .error - .clone() - .or(test_error) - .unwrap_or_else(|| "prover is unhealthy".to_string()); - ServiceStatus::unhealthy(name, error, details) - }, - (Status::Unknown, _) => ServiceStatus::unknown(name, details), - _ => ServiceStatus::healthy(name, details), - }; - - base.with_last_checked(status.last_checked) -} diff --git a/bin/network-monitor/src/monitor/tasks.rs b/bin/network-monitor/src/monitor/tasks.rs index 1db8e6525..422d85b30 100644 --- a/bin/network-monitor/src/monitor/tasks.rs +++ b/bin/network-monitor/src/monitor/tasks.rs @@ -14,6 +14,7 @@ use tokio::sync::watch::Receiver; use tokio::sync::{Mutex, watch}; use tokio::task::{Id, JoinSet}; use tracing::{debug, instrument}; +use url::Url; use crate::COMPONENT; use crate::config::MonitorConfig; @@ -23,7 +24,13 @@ use crate::explorer::{initial_explorer_status, run_explorer_status_task}; use crate::faucet::{FaucetTestDetails, run_faucet_test_task}; use crate::frontend::{ServerState, serve}; use crate::note_transport::{initial_note_transport_status, run_note_transport_status_task}; -use crate::remote_prover::{ProofType, generate_prover_test_payload, run_remote_prover_test_task}; +use crate::remote_prover::{ + ProofType, + generate_prover_test_payload, + merge_prover, + run_prover_combiner, + run_remote_prover_test_task, +}; use crate::status::{ CounterTrackingDetails, IncrementDetails, @@ -225,114 +232,124 @@ impl Tasks { pub async fn spawn_prover_tasks( &mut self, config: &MonitorConfig, - ) -> Result, watch::Receiver)>> { + ) -> Result>> { debug!(target: COMPONENT, prover_count = config.remote_prover_urls.len(), "Spawning prover tasks"); let mut prover_rxs = Vec::new(); - for (i, prover_url) in config.remote_prover_urls.iter().enumerate() { - let name = format!("Remote Prover ({})", i + 1); - - let mut remote_prover = ClientBuilder::new(prover_url.clone()) - .with_tls() - .expect("TLS is enabled") - .with_timeout(config.request_timeout) - .without_metadata_version() - .without_metadata_genesis() - .without_otel_context_injection() - .connect_lazy::(); - - let initial_prover_status = check_remote_prover_status( - &mut remote_prover, - name.clone(), - prover_url.to_string(), - ) - .await; - - let (prover_status_tx, prover_status_rx) = - watch::channel(initial_prover_status.clone()); - - // Spawn the remote prover status check task - let component_name = format!("prover-checker-{}", i + 1); + prover_rxs.push(self.spawn_single_prover(config, i, prover_url).await); + } + debug!(target: COMPONENT, spawned_provers = prover_rxs.len(), "All prover tasks spawned successfully"); + Ok(prover_rxs) + } + + /// Spawns the status checker, optional test task, and combiner task for a single prover. + /// Returns the receiver of the merged status channel. + async fn spawn_single_prover( + &mut self, + config: &MonitorConfig, + index: usize, + prover_url: &Url, + ) -> watch::Receiver { + let name = format!("Remote Prover ({})", index + 1); + + let mut remote_prover = ClientBuilder::new(prover_url.clone()) + .with_tls() + .expect("TLS is enabled") + .with_timeout(config.request_timeout) + .without_metadata_version() + .without_metadata_genesis() + .without_otel_context_injection() + .connect_lazy::(); + + let initial_prover_status = + check_remote_prover_status(&mut remote_prover, name.clone(), prover_url.to_string()) + .await; + + let (prover_status_tx, prover_status_rx) = watch::channel(initial_prover_status.clone()); + + // Spawn the remote prover status check task + let prover_url_clone = prover_url.clone(); + let name_clone = name.clone(); + let status_check_interval = config.status_check_interval; + let request_timeout = config.request_timeout; + let id = self + .handles + .spawn(async move { + run_remote_prover_status_task( + prover_url_clone, + name_clone, + prover_status_tx, + status_check_interval, + request_timeout, + ) + .await; + }) + .id(); + self.names.insert(id, format!("prover-checker-{}", index + 1)); + + // Extract proof_type directly from the service status. If the prover is not available + // during startup, skip spawning test tasks. + let proof_type = if let ServiceDetails::ProverStatusCheck(details) = + &initial_prover_status.details + { + Some(details.supported_proof_type.clone()) + } else { + tracing::warn!( + "Prover {name} is not available during startup, skipping test task initialization" + ); + None + }; + + // Only spawn test tasks for transaction provers; others get a dummy closed channel. + let prover_test_rx = if matches!(proof_type, Some(ProofType::Transaction)) { + debug!("Starting transaction proof tests for prover: {name}"); + let payload = generate_prover_test_payload().await; + let (prover_test_tx, prover_test_rx) = watch::channel(initial_prover_status.clone()); + let prover_url_clone = prover_url.clone(); let name_clone = name.clone(); - let status_check_interval = config.status_check_interval; - let request_timeout = config.request_timeout; + let proof_type = proof_type.expect("proof type is Some"); + let remote_prover_interval = config.remote_prover_test_interval; + let id = self .handles .spawn(async move { - run_remote_prover_status_task( + run_remote_prover_test_task( prover_url_clone, - name_clone, - prover_status_tx, - status_check_interval, + &name_clone, + proof_type, + payload, + prover_test_tx, request_timeout, + remote_prover_interval, ) .await; }) .id(); - self.names.insert(id, component_name); - - // Extract proof_type directly from the service status - // If the prover is not available during startup, skip spawning test tasks - let proof_type = if let ServiceDetails::ProverStatusCheck(details) = - &initial_prover_status.details - { - Some(details.supported_proof_type.clone()) - } else { - // Prover is not available during startup, but we'll still monitor its status - tracing::warn!( - "Prover {} is not available during startup, skipping test task initialization", - name - ); - None - }; - - // Only spawn test tasks for transaction provers if proof_type is available - let prover_test_rx = if matches!(proof_type, Some(ProofType::Transaction)) { - debug!("Starting transaction proof tests for prover: {}", name); - let payload = generate_prover_test_payload().await; - let (prover_test_tx, prover_test_rx) = - watch::channel(initial_prover_status.clone()); - - let prover_url_clone = prover_url.clone(); - let name_clone = name.clone(); - let proof_type = proof_type.expect("proof type is Some"); - let remote_prover_interval = config.remote_prover_test_interval; - - let id = self - .handles - .spawn(async move { - run_remote_prover_test_task( - prover_url_clone, - &name_clone, - proof_type, - payload, - prover_test_tx, - request_timeout, - remote_prover_interval, - ) - .await; - }) - .id(); - let component_name = format!("prover-test-{}", i + 1); - self.names.insert(id, component_name); - - prover_test_rx - } else { - debug!( - "Skipping prover tests for {} (supports {:?} proofs, only testing Transaction proofs)", - name, proof_type - ); - // For non-transaction provers, create a dummy receiver with no test task - let (_tx, rx) = watch::channel(initial_prover_status.clone()); - rx - }; - - prover_rxs.push((prover_status_rx, prover_test_rx)); - } + self.names.insert(id, format!("prover-test-{}", index + 1)); + + prover_test_rx + } else { + debug!( + "Skipping prover tests for {name} (supports {proof_type:?} proofs, only testing Transaction proofs)" + ); + let (_tx, rx) = watch::channel(initial_prover_status.clone()); + rx + }; - debug!(target: COMPONENT, spawned_provers = prover_rxs.len(), "All prover tasks spawned successfully"); - Ok(prover_rxs) + // Spawn a combiner task that merges the status and test receivers into a single + // `ServiceStatus` channel, which is what `ServerState` consumes. + let initial_merged = merge_prover(&prover_status_rx.borrow(), &prover_test_rx.borrow()); + let (merged_tx, merged_rx) = watch::channel(initial_merged); + let id = self + .handles + .spawn(async move { + run_prover_combiner(prover_status_rx, prover_test_rx, merged_tx).await; + }) + .id(); + self.names.insert(id, format!("prover-combiner-{}", index + 1)); + + merged_rx } /// Spawn the faucet testing task. diff --git a/bin/network-monitor/src/remote_prover.rs b/bin/network-monitor/src/remote_prover.rs index 27d0a9970..795dd3397 100644 --- a/bin/network-monitor/src/remote_prover.rs +++ b/bin/network-monitor/src/remote_prover.rs @@ -23,7 +23,13 @@ use tracing::{info, instrument}; use url::Url; use crate::COMPONENT; -use crate::status::{ServiceDetails, ServiceStatus}; +use crate::status::{ + ProverTestOutcome, + RemoteProverDetails, + ServiceDetails, + ServiceStatus, + Status, +}; // PROOF TYPE // ================================================================================================ @@ -326,3 +332,83 @@ pub(crate) async fn generate_prover_test_payload() -> proto::remote_prover::Proo payload: generate_mock_transaction().await.unwrap().to_bytes(), } } + +// PROVER MERGE +// ================================================================================================ + +/// Merges the status and test receivers for a single remote prover into one `ServiceStatus`. +/// +/// The combined status is `Unhealthy` if either the status check or the test failed, `Unknown` +/// if the status checker has not yet seen the prover, and `Healthy` otherwise. The test result +/// is only attached when the test task has produced an actual `ProverTestResult` value (before +/// the first test completes, the test channel holds the initial prover status and should not be +/// surfaced as a test). +pub(crate) fn merge_prover(status: &ServiceStatus, test: &ServiceStatus) -> ServiceStatus { + // If the status checker hasn't produced a real result (prover is down), pass its error + // status straight through. + let ServiceDetails::ProverStatusCheck(status_details) = &status.details else { + return status.clone(); + }; + + // Attach test info only once the test task has produced a real result. + let test_outcome = if let ServiceDetails::ProverTestResult(d) = &test.details { + Some(ProverTestOutcome { + details: d.clone(), + status: test.status.clone(), + }) + } else { + None + }; + let test_unhealthy = test_outcome.as_ref().is_some_and(|t| t.status == Status::Unhealthy); + let test_error = test_outcome.as_ref().and_then(|_| test.error.clone()); + + let details = ServiceDetails::RemoteProverStatus(RemoteProverDetails { + status: status_details.clone(), + test: test_outcome, + }); + + let result = if status.status == Status::Unhealthy || test_unhealthy { + let err = status + .error + .clone() + .or(test_error) + .unwrap_or_else(|| "prover is unhealthy".to_string()); + ServiceStatus::unhealthy(&status.name, err, details) + } else if status.status == Status::Unknown { + ServiceStatus::unknown(&status.name, details) + } else { + ServiceStatus::healthy(&status.name, details) + }; + + result.with_last_checked(status.last_checked) +} + +/// Watches a prover's status and test channels, publishing the merged [`ServiceStatus`] on every +/// upstream change. +/// +/// Exits when the output channel has no receivers or when the upstream status channel's sender is +/// dropped. If the test channel's sender is dropped (e.g. a non-transaction prover with no test +/// task), the combiner falls back to watching the status channel only. +pub(crate) async fn run_prover_combiner( + mut status_rx: watch::Receiver, + mut test_rx: watch::Receiver, + merged_tx: watch::Sender, +) { + let mut test_alive = true; + loop { + let merged = merge_prover(&status_rx.borrow(), &test_rx.borrow()); + if merged_tx.send(merged).is_err() { + info!("No receivers for merged prover status updates, shutting down"); + return; + } + + if test_alive { + tokio::select! { + r = status_rx.changed() => if r.is_err() { return; }, + r = test_rx.changed() => if r.is_err() { test_alive = false; }, + } + } else if status_rx.changed().await.is_err() { + return; + } + } +} diff --git a/bin/network-monitor/src/service_status.rs b/bin/network-monitor/src/service_status.rs index 689b976bf..65281cb11 100644 --- a/bin/network-monitor/src/service_status.rs +++ b/bin/network-monitor/src/service_status.rs @@ -148,9 +148,14 @@ pub enum ServiceDetails { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct RemoteProverDetails { pub status: RemoteProverStatusDetails, - pub test: Option, - pub test_status: Option, - pub test_error: Option, + pub test: Option, +} + +/// Most recent outcome of a remote prover test task. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ProverTestOutcome { + pub details: ProverTestDetails, + pub status: Status, } /// Details of the increment service.