From e4ab3c643d8d56796fdc5a11b2481a3d9e4ff8c1 Mon Sep 17 00:00:00 2001 From: SantiagoPittella Date: Fri, 24 Apr 2026 17:11:32 -0300 Subject: [PATCH] chore: add Service trait in monitor --- bin/network-monitor/src/commands/start.rs | 56 +- bin/network-monitor/src/counter.rs | 1476 ++++++++++----------- bin/network-monitor/src/explorer.rs | 158 +-- bin/network-monitor/src/faucet.rs | 127 +- bin/network-monitor/src/main.rs | 1 + bin/network-monitor/src/monitor/tasks.rs | 532 ++------ bin/network-monitor/src/note_transport.rs | 131 +- bin/network-monitor/src/remote_prover.rs | 469 +++---- bin/network-monitor/src/service.rs | 71 + bin/network-monitor/src/service_status.rs | 7 +- bin/network-monitor/src/status.rs | 317 ++--- bin/network-monitor/src/validator.rs | 107 +- 12 files changed, 1451 insertions(+), 2001 deletions(-) create mode 100644 bin/network-monitor/src/service.rs diff --git a/bin/network-monitor/src/commands/start.rs b/bin/network-monitor/src/commands/start.rs index c89df5d32..9b9499daa 100644 --- a/bin/network-monitor/src/commands/start.rs +++ b/bin/network-monitor/src/commands/start.rs @@ -4,7 +4,7 @@ use anyhow::Result; use miden_node_utils::logging::OpenTelemetry; -use tracing::{debug, info, instrument, warn}; +use tracing::{info, instrument}; use crate::COMPONENT; use crate::config::MonitorConfig; @@ -26,7 +26,6 @@ use crate::monitor::tasks::Tasks; err )] pub async fn start_monitor(config: MonitorConfig) -> Result<()> { - // Load configuration from command-line arguments and environment variables info!("Loaded configuration: {:?}", config); let _otel_guard = if config.enable_otel { @@ -37,61 +36,32 @@ pub async fn start_monitor(config: MonitorConfig) -> Result<()> { let mut tasks = Tasks::new(); - // Initialize the RPC Status endpoint checker task. - debug!(target: COMPONENT, "Initializing RPC status checker"); - let rpc_rx = tasks.spawn_rpc_checker(&config).await?; + let rpc_rx = tasks.spawn_rpc_checker(&config); - // Initialize the explorer status checker task. - let explorer_rx = if config.explorer_url.is_some() { - Some(tasks.spawn_explorer_checker(&config).await?) - } else { - None - }; - - // Initialize the note transport status checker task. - let note_transport_rx = if config.note_transport_url.is_some() { - Some(tasks.spawn_note_transport_checker(&config).await?) - } else { - None - }; - - // Initialize the validator status checker task. - let validator_rx = if config.validator_url.is_some() { - Some(tasks.spawn_validator_checker(&config).await?) - } else { - None - }; - - // Initialize the prover checkers & tests tasks, only if URLs were provided. let prover_rxs = if config.remote_prover_urls.is_empty() { - debug!(target: COMPONENT, "No remote prover URLs configured, skipping prover tasks"); Vec::new() } else { - debug!(target: COMPONENT, "Initializing prover checkers and tests"); - tasks.spawn_prover_tasks(&config).await? + tasks.spawn_prover_tasks(&config).await }; - // Initialize the faucet testing task. - let faucet_rx = if config.faucet_url.is_some() { - debug!(target: COMPONENT, "Initializing faucet testing task"); - Some(tasks.spawn_faucet(&config)) - } else { - warn!("Faucet URL not configured, skipping faucet testing"); - None - }; + let faucet_rx = config.faucet_url.is_some().then(|| tasks.spawn_faucet(&config)); + + let explorer_rx = config.explorer_url.is_some().then(|| tasks.spawn_explorer_checker(&config)); - // Initialize the counter increment and tracking tasks only if enabled. let (ntx_increment_rx, ntx_tracking_rx) = if config.disable_ntx_service { - debug!(target: COMPONENT, "NTX service disabled, skipping counter increment task"); (None, None) } else { - debug!(target: COMPONENT, "Initializing counter increment task"); let (increment_rx, tracking_rx) = tasks.spawn_ntx_service(&config).await?; (Some(increment_rx), Some(tracking_rx)) }; - // Initialize HTTP server. - debug!(target: COMPONENT, "Initializing HTTP server"); + let note_transport_rx = config + .note_transport_url + .is_some() + .then(|| tasks.spawn_note_transport_checker(&config)); + + let validator_rx = + config.validator_url.is_some().then(|| tasks.spawn_validator_checker(&config)); // Build the flat services Vec in the order the dashboard expects to render cards. let services = std::iter::once(rpc_rx) diff --git a/bin/network-monitor/src/counter.rs b/bin/network-monitor/src/counter.rs index 5db834cfd..3d8c56b0f 100644 --- a/bin/network-monitor/src/counter.rs +++ b/bin/network-monitor/src/counter.rs @@ -13,7 +13,8 @@ use miden_node_proto::clients::RpcClient; use miden_node_proto::generated::rpc::BlockHeaderByNumberRequest; use miden_node_proto::generated::transaction::ProvenTransaction; use miden_protocol::account::auth::AuthSecretKey; -use miden_protocol::account::{Account, AccountFile, AccountHeader, AccountId}; +use miden_protocol::account::{Account, AccountCode, AccountFile, AccountHeader, AccountId}; +use miden_protocol::asset::AssetVault; use miden_protocol::block::{BlockHeader, BlockNumber}; use miden_protocol::crypto::dsa::falcon512_poseidon2::SecretKey; use miden_protocol::note::{ @@ -36,21 +37,13 @@ use miden_tx::auth::BasicAuthenticator; use miden_tx::{LocalTransactionProver, TransactionExecutor}; use rand::{Rng, SeedableRng}; use rand_chacha::ChaCha20Rng; -use tokio::sync::{Mutex, watch}; +use tokio::sync::Mutex; use tracing::{error, info, instrument, warn}; -/// Number of consecutive increment failures before re-syncing the wallet account from the RPC. -const RESYNC_FAILURE_THRESHOLD: usize = 3; - -/// Number of consecutive increment failures before regenerating accounts from scratch. -const REGENERATE_FAILURE_THRESHOLD: usize = 10; - -/// Minimum time between account regeneration attempts. -const REGENERATE_COOLDOWN: Duration = Duration::from_secs(3600); - use crate::config::MonitorConfig; use crate::deploy::counter::COUNTER_SLOT_NAME; use crate::deploy::{MonitorDataStore, create_genesis_aware_rpc_client}; +use crate::service::Service; use crate::status::{ CounterTrackingDetails, IncrementDetails, @@ -60,6 +53,18 @@ use crate::status::{ }; use crate::{COMPONENT, current_unix_timestamp_secs}; +/// Number of consecutive increment failures before re-syncing the wallet account from the RPC. +const RESYNC_FAILURE_THRESHOLD: usize = 3; + +/// Number of consecutive increment failures before regenerating accounts from scratch. +const REGENERATE_FAILURE_THRESHOLD: usize = 10; + +/// Minimum time between account regeneration attempts. +const REGENERATE_COOLDOWN: Duration = Duration::from_secs(3600); + +// SHARED STATE +// ================================================================================================ + #[derive(Debug, Default, Clone)] pub struct LatencyState { pending: Option, @@ -67,688 +72,522 @@ pub struct LatencyState { last_latency_blocks: Option, } -/// Get the genesis block header. -async fn get_genesis_block_header(rpc_client: &mut RpcClient) -> Result { - let block_header_request = BlockHeaderByNumberRequest { - block_num: Some(BlockNumber::GENESIS.as_u32()), - include_mmr_proof: None, - }; - - let response = rpc_client - .get_block_header_by_number(block_header_request) - .await - .context("Failed to get genesis block header from RPC")? - .into_inner(); - - let genesis_block_header = response - .block_header - .ok_or_else(|| anyhow::anyhow!("No block header in response"))?; - - let block_header: BlockHeader = - genesis_block_header.try_into().context("Failed to convert block header")?; - - Ok(block_header) -} +// TX BUILDER +// ================================================================================================ -/// Fetch the storage header of the given account from RPC. +/// Everything needed to build and submit one increment network note. /// -/// Returns `None` if the account does not exist or has no details available. -async fn fetch_account_storage_header( - rpc_client: &mut RpcClient, - account_id: AccountId, -) -> Result> { - let request = build_account_request(account_id, false); - let resp = rpc_client.get_account(request).await?.into_inner(); - - let Some(details) = resp.details else { - return Ok(None); - }; +/// Produced by [`setup_increment_task`]. +struct TxBuilder { + wallet_account: Account, + counter_account: Account, + secret_key: SecretKey, + increment_script: NoteScript, + data_store: MonitorDataStore, + block_header: BlockHeader, + rng: ChaCha20Rng, +} - let storage_details = details.storage_details.context("missing storage details")?; - let storage_header = storage_details.header.context("missing storage header")?; +// FAILURE TRACKER +// ================================================================================================ - Ok(Some(storage_header)) +/// Tracks consecutive increment failures and gates re-sync / regeneration actions. +#[derive(Default)] +struct FailureTracker { + consecutive_failures: usize, + last_regeneration: Option, } -/// Fetch the latest nonce of the given account from RPC. -async fn fetch_counter_value( - rpc_client: &mut RpcClient, - account_id: AccountId, -) -> Result> { - let Some(storage_header) = fetch_account_storage_header(rpc_client, account_id).await? else { - return Ok(None); - }; +impl FailureTracker { + fn record_failure(&mut self) { + self.consecutive_failures += 1; + } - let counter_slot = storage_header - .slots - .iter() - .find(|slot| slot.slot_name == COUNTER_SLOT_NAME.as_str()) - .context(format!("counter slot '{}' not found", COUNTER_SLOT_NAME.as_str()))?; + fn reset(&mut self) { + self.consecutive_failures = 0; + } - // The counter value is stored as a Word, with the actual u64 value in the first element - let slot_value: Word = counter_slot - .commitment - .as_ref() - .context("missing storage slot value")? - .try_into() - .context("failed to convert slot value to word")?; + fn should_resync(&self) -> bool { + self.consecutive_failures >= RESYNC_FAILURE_THRESHOLD + } - let value = slot_value - .as_elements() - .first() - .expect("Word has 4 elements") - .as_canonical_u64(); + fn should_regenerate(&self) -> bool { + self.consecutive_failures >= REGENERATE_FAILURE_THRESHOLD + && self.last_regeneration.is_none_or(|t| t.elapsed() >= REGENERATE_COOLDOWN) + } - Ok(Some(value)) + fn mark_regenerated(&mut self) { + self.last_regeneration = Some(Instant::now()); + } } -/// Build an account request for the given account ID. -/// -/// If `include_code_and_vault` is true, uses dummy commitments to force the server -/// to return code and vault data (server only returns data when our commitment differs). -fn build_account_request( - account_id: AccountId, - include_code_and_vault: bool, -) -> miden_node_proto::generated::rpc::AccountRequest { - let id_bytes: [u8; 15] = account_id.into(); - let account_id_proto = - miden_node_proto::generated::account::AccountId { id: id_bytes.to_vec() }; +// INCREMENT SERVICE +// ================================================================================================ - let (code_commitment, asset_vault_commitment) = if include_code_and_vault { - let dummy: miden_node_proto::generated::primitives::Digest = Word::default().into(); - (Some(dummy), Some(dummy)) - } else { - (None, None) - }; +/// Periodically submits a network note that increments the counter account. +pub struct IncrementService { + config: MonitorConfig, + rpc_client: RpcClient, + tx: TxBuilder, + failures: FailureTracker, + details: IncrementDetails, + expected_counter_value: Arc, + latency_state: Arc>, +} - miden_node_proto::generated::rpc::AccountRequest { - account_id: Some(account_id_proto), - block_num: None, - details: Some(miden_node_proto::generated::rpc::account_request::AccountDetailRequest { - code_commitment, - asset_vault_commitment, - storage_maps: vec![], - }), +impl IncrementService { + pub async fn new( + config: MonitorConfig, + expected_counter_value: Arc, + latency_state: Arc>, + ) -> Result { + let mut rpc_client = + create_genesis_aware_rpc_client(&config.rpc_url, config.request_timeout).await?; + let (tx, details) = setup_increment_task(config.clone(), &mut rpc_client).await?; + Ok(Self { + config, + rpc_client, + tx, + failures: FailureTracker::default(), + details, + expected_counter_value, + latency_state, + }) } -} -/// Fetch an account from RPC and reconstruct the full Account. -/// -/// Uses dummy commitments to force the server to return all data (code, vault, storage header). -/// Only supports accounts with value slots; returns an error if storage maps are present. -async fn fetch_wallet_account( - rpc_client: &mut RpcClient, - account_id: AccountId, -) -> Result> { - use miden_protocol::account::AccountCode; - use miden_protocol::asset::AssetVault; + /// Applies a successful increment: updates the wallet nonce, bumps counters, and returns + /// the next expected counter value. + fn handle_increment_success(&mut self, final_account: &AccountHeader, tx_id: String) -> u64 { + let updated_wallet = Account::new( + self.tx.wallet_account.id(), + self.tx.wallet_account.vault().clone(), + self.tx.wallet_account.storage().clone(), + self.tx.wallet_account.code().clone(), + final_account.nonce(), + None, + ) + .expect("nonce-only update of an already-valid account cannot fail"); + self.tx.wallet_account = updated_wallet; + self.tx.data_store.update_account(self.tx.wallet_account.clone()); - let request = build_account_request(account_id, true); + self.details.success_count += 1; + self.details.last_tx_id = Some(tx_id); - let response = match rpc_client.get_account(request).await { - Ok(response) => response.into_inner(), - Err(e) => { - warn!(account.id = %account_id, err = %e, "failed to fetch wallet account via RPC"); - return Ok(None); - }, - }; + self.expected_counter_value.fetch_add(1, Ordering::Relaxed) + 1 + } - let Some(details) = response.details else { - if response.witness.is_some() { - info!( - account.id = %account_id, - "account found on-chain but cannot reconstruct full account from RPC response" - ); - } - return Ok(None); - }; + /// Re-sync the wallet account from the RPC after repeated failures. + #[instrument( + parent = None, + target = COMPONENT, + name = "network_monitor.counter.try_resync_wallet_account", + skip_all, + fields(account.id = %self.tx.wallet_account.id()), + level = "warn", + err, + )] + async fn try_resync_wallet_account(&mut self) -> Result<()> { + let fresh_account = fetch_wallet_account(&mut self.rpc_client, self.tx.wallet_account.id()) + .await + .inspect_err(|e| { + error!(account.id = %self.tx.wallet_account.id(), err = ?e, "failed to re-sync wallet account from RPC"); + })? + .context("wallet account not found on-chain during re-sync") + .inspect_err(|e| { + error!(account.id = %self.tx.wallet_account.id(), err = ?e, "wallet account not found on-chain during re-sync"); + })?; + + info!(account.id = %self.tx.wallet_account.id(), "wallet account re-synced from RPC"); + self.tx.wallet_account = fresh_account; + self.tx.data_store.update_account(self.tx.wallet_account.clone()); + Ok(()) + } - let header = details.header.context("missing account header")?; - let nonce: u64 = header.nonce; + /// Regenerate accounts from scratch when re-sync is ineffective. + #[instrument( + parent = None, + target = COMPONENT, + name = "network_monitor.counter.try_regenerate_accounts", + skip_all, + level = "warn", + err, + )] + async fn try_regenerate_accounts(&mut self) -> Result<()> { + crate::deploy::force_recreate_accounts( + &self.config.wallet_filepath, + &self.config.counter_filepath, + &self.config.rpc_url, + ) + .await + .context("failed to regenerate accounts")?; - let code = details - .code - .map(|code_bytes| AccountCode::read_from_bytes(&code_bytes)) - .transpose() - .context("failed to deserialize account code")? - .context("server did not return account code")?; + let (tx, details) = setup_increment_task(self.config.clone(), &mut self.rpc_client).await?; + self.tx = tx; + self.details = details; - let vault = match details.vault_details { - Some(vault_details) if vault_details.too_many_assets => { - anyhow::bail!("account {account_id} has too many assets, cannot fetch full account"); - }, - Some(vault_details) => { - let assets: Vec = vault_details - .assets - .into_iter() - .map(TryInto::try_into) - .collect::>() - .context("failed to convert assets")?; - AssetVault::new(&assets).context("failed to create vault")? - }, - None => anyhow::bail!("server did not return asset vault for account {account_id}"), - }; + info!("account regeneration completed, increment task re-initialized"); + Ok(()) + } - let storage_details = details.storage_details.context("missing storage details")?; - let storage = build_account_storage(storage_details)?; + /// Create and submit a network note that increments the counter account. + #[instrument( + parent = None, + target = COMPONENT, + name = "network_monitor.counter.submit_increment", + skip_all, + level = "info", + ret(level = "debug"), + err + )] + async fn submit_increment(&mut self) -> Result<(String, AccountHeader, BlockNumber)> { + let authenticator = BasicAuthenticator::new(&[AuthSecretKey::Falcon512Poseidon2( + self.tx.secret_key.clone(), + )]); + + let account_interface = AccountInterface::from_account(&self.tx.wallet_account); + + let (network_note, note_recipient) = create_network_note( + &self.tx.wallet_account, + &self.tx.counter_account, + self.tx.increment_script.clone(), + &mut self.tx.rng, + )?; + let script = account_interface.build_send_notes_script(&[network_note.into()], None)?; + + let executor = + TransactionExecutor::new(&self.tx.data_store).with_authenticator(&authenticator); + + let mut tx_args = TransactionArgs::default().with_tx_script(script); + tx_args.add_output_note_recipient(Box::new(note_recipient)); + + let executed_tx = Box::pin(executor.execute_transaction( + self.tx.wallet_account.id(), + self.tx.block_header.block_num(), + InputNotes::default(), + tx_args, + )) + .await + .context("Failed to execute transaction")?; - let account = Account::new(account_id, vault, storage, code, Felt::new(nonce), None) - .context("failed to create account")?; + let tx_inputs = executed_tx.tx_inputs().to_bytes(); + let final_account = executed_tx.final_account().clone(); - // Sanity check: verify reconstructed account matches header commitments - let expected_code_commitment: Word = header - .code_commitment - .context("missing code commitment in header")? - .try_into() - .context("invalid code commitment")?; - let expected_vault_root: Word = header - .vault_root - .context("missing vault root in header")? - .try_into() - .context("invalid vault root")?; - let expected_storage_commitment: Word = header - .storage_commitment - .context("missing storage commitment in header")? - .try_into() - .context("invalid storage commitment")?; + let prover = LocalTransactionProver::default(); + let proven_tx = prover.prove(executed_tx).await.context("Failed to prove transaction")?; - anyhow::ensure!( - account.code().commitment() == expected_code_commitment, - "code commitment mismatch: rebuilt={:?}, expected={:?}", - account.code().commitment(), - expected_code_commitment - ); - anyhow::ensure!( - account.vault().root() == expected_vault_root, - "vault root mismatch: rebuilt={:?}, expected={:?}", - account.vault().root(), - expected_vault_root - ); - anyhow::ensure!( - account.storage().to_commitment() == expected_storage_commitment, - "storage commitment mismatch: rebuilt={:?}, expected={:?}", - account.storage().to_commitment(), - expected_storage_commitment - ); + let request = ProvenTransaction { + transaction: proven_tx.to_bytes(), + transaction_inputs: Some(tx_inputs), + }; - info!(account.id = %account_id, "fetched wallet account from RPC"); - Ok(Some(account)) -} + let block_height: BlockNumber = self + .rpc_client + .submit_proven_transaction(request) + .await + .context("Failed to submit proven transaction to RPC")? + .into_inner() + .block_num + .into(); -/// Build account storage from the storage details returned by the server. -/// -/// This function only supports accounts with value slots. If any storage map slots -/// are encountered, an error is returned since the monitor only uses simple accounts. -fn build_account_storage( - storage_details: miden_node_proto::generated::rpc::AccountStorageDetails, -) -> Result { - use miden_protocol::account::{AccountStorage, StorageSlot}; + info!("Submitted proven transaction to RPC"); - let storage_header = storage_details.header.context("missing storage header")?; + let tx_id = proven_tx.id().to_hex(); - let mut slots = Vec::new(); - for slot in storage_header.slots { - let slot_name = miden_protocol::account::StorageSlotName::new(slot.slot_name.clone()) - .context("invalid slot name")?; - let value: Word = slot - .commitment - .context("missing slot value")? - .try_into() - .context("invalid slot value")?; + Ok((tx_id, final_account, block_height)) + } +} - // slot_type: 0 = Value, 1 = Map - anyhow::ensure!( - slot.slot_type == 0, - "storage map slots are not supported for this account" - ); +impl Service for IncrementService { + fn name(&self) -> &'static str { + "Local Transactions" + } - slots.push(StorageSlot::with_value(slot_name, value)); + fn interval(&self) -> Duration { + self.config.counter_increment_interval } - AccountStorage::new(slots).context("failed to create account storage") -} + fn initial_status(&self) -> ServiceStatus { + ServiceStatus::unknown( + self.name(), + ServiceDetails::NtxIncrement(IncrementDetails::default()), + ) + } -async fn setup_increment_task( - config: MonitorConfig, - rpc_client: &mut RpcClient, -) -> Result<( - IncrementDetails, - Account, - Account, - BlockHeader, - MonitorDataStore, - NoteScript, - SecretKey, -)> { - let details = IncrementDetails::default(); - // Load accounts from files - let wallet_account_file = - AccountFile::read(config.wallet_filepath).context("Failed to read wallet account file")?; - let wallet_account = fetch_wallet_account(rpc_client, wallet_account_file.account.id()) - .await? - .unwrap_or(wallet_account_file.account.clone()); + async fn check(&mut self) -> ServiceStatus { + let mut last_error = None; - let AuthSecretKey::Falcon512Poseidon2(secret_key) = wallet_account_file - .auth_secret_keys - .first() - .expect("wallet account file should have one auth secret key") - .clone() - else { - anyhow::bail!("Failed to load wallet account, auth secret key not found") - }; - - let counter_account = match load_counter_account(&config.counter_filepath) { - Ok(account) => account, - Err(e) => { - error!("Failed to load counter account: {:?}", e); - anyhow::bail!("Failed to load counter account: {e}") - }, - }; - - // Get the genesis block header - let block_header = get_genesis_block_header(rpc_client).await?; - - // Create the increment procedure script and get the library - let increment_script = create_increment_script()?; - - // Create unified data store for transaction execution - let mut data_store = MonitorDataStore::new(block_header.clone(), PartialBlockchain::default()); - data_store.add_account(wallet_account.clone()); - data_store.add_account(counter_account.clone()); - - Ok(( - details, - wallet_account, - counter_account, - block_header, - data_store, - increment_script, - secret_key, - )) -} - -/// Run the counter increment task. -/// -/// This function periodically creates network notes that target the counter account and sends -/// transactions to increment it. -/// -/// # Arguments -/// -/// * `config` - The monitor configuration containing file paths and intervals. -/// * `tx` - The watch channel sender for status updates. -/// * `expected_counter_value` - Shared atomic counter for tracking expected value based on -/// successful increments. -/// -/// # Returns -/// -/// This function runs indefinitely, only returning on error. -pub async fn run_increment_task( - config: MonitorConfig, - tx: watch::Sender, - expected_counter_value: Arc, - latency_state: Arc>, -) -> Result<()> { - // Create RPC client - let mut rpc_client = - create_genesis_aware_rpc_client(&config.rpc_url, config.request_timeout).await?; - - let ( - mut details, - mut wallet_account, - mut counter_account, - mut block_header, - mut data_store, - mut increment_script, - mut secret_key, - ) = setup_increment_task(config.clone(), &mut rpc_client).await?; - - let mut rng = ChaCha20Rng::from_os_rng(); - let mut interval = tokio::time::interval(config.counter_increment_interval); - let mut consecutive_failures: usize = 0; - let mut last_regeneration: Option = None; - - loop { - interval.tick().await; - - let mut last_error = None; - - match create_and_submit_network_note( - &wallet_account, - &counter_account, - &secret_key, - &mut rpc_client, - &data_store, - &block_header, - &increment_script, - &mut rng, - ) - .await - { + match self.submit_increment().await { Ok((tx_id, final_account, block_height)) => { - consecutive_failures = 0; - - let target_value = handle_increment_success( - &mut wallet_account, - &final_account, - &mut data_store, - &mut details, - tx_id, - &expected_counter_value, - )?; - - { - let mut guard = latency_state.lock().await; - guard.pending = Some(PendingLatencyDetails { - submit_height: block_height.as_u32(), - target_value, - }); - guard.pending_started = Some(Instant::now()); - } + self.failures.reset(); + let target_value = self.handle_increment_success(&final_account, tx_id); + let mut guard = self.latency_state.lock().await; + guard.pending = Some(PendingLatencyDetails { + submit_height: block_height.as_u32(), + target_value, + }); + guard.pending_started = Some(Instant::now()); }, Err(e) => { - consecutive_failures += 1; - last_error = Some(handle_increment_failure(&mut details, &e)); - - if consecutive_failures >= RESYNC_FAILURE_THRESHOLD - && try_resync_wallet_account( - &mut rpc_client, - &mut wallet_account, - &mut data_store, - ) - .await - .is_ok() - { - consecutive_failures = 0; + error!("Failed to create and submit network note: {:?}", e); + self.details.failure_count += 1; + self.failures.record_failure(); + last_error = Some(format!("create/submit note failed: {e}")); + + if self.failures.should_resync() && self.try_resync_wallet_account().await.is_ok() { + self.failures.reset(); } - // If re-sync keeps failing, regenerate accounts from scratch (rate-limited). - let cooldown_elapsed = - last_regeneration.is_none_or(|t| t.elapsed() >= REGENERATE_COOLDOWN); - if consecutive_failures >= REGENERATE_FAILURE_THRESHOLD && cooldown_elapsed { + if self.failures.should_regenerate() { warn!( - consecutive_failures, + consecutive_failures = self.failures.consecutive_failures, "re-sync ineffective, regenerating accounts from scratch" ); - last_regeneration = Some(Instant::now()); - match try_regenerate_accounts(&config, &mut rpc_client).await { - Ok(new_state) => { - ( - details, - wallet_account, - counter_account, - block_header, - data_store, - increment_script, - secret_key, - ) = new_state; - consecutive_failures = 0; + self.failures.mark_regenerated(); + match self.try_regenerate_accounts().await { + Ok(()) => self.failures.reset(), + Err(regen_err) => { + error!("account regeneration failed: {regen_err:?}"); }, - Err(regen_err) => error!("account regeneration failed: {regen_err:?}"), } } }, } { - let guard = latency_state.lock().await; - details.last_latency_blocks = guard.last_latency_blocks; + let guard = self.latency_state.lock().await; + self.details.last_latency_blocks = guard.last_latency_blocks; } - let status = build_increment_status(&details, last_error); - send_status(&tx, status)?; + build_increment_status(&self.details, last_error) } } -/// Handle the success path for increment operations. -/// -/// Returns the next expected counter value after a successful increment. -fn handle_increment_success( - wallet_account: &mut Account, - final_account: &AccountHeader, - data_store: &mut MonitorDataStore, - details: &mut IncrementDetails, - tx_id: String, - expected_counter_value: &Arc, -) -> Result { - let updated_wallet = Account::new( - wallet_account.id(), - wallet_account.vault().clone(), - wallet_account.storage().clone(), - wallet_account.code().clone(), - final_account.nonce(), - None, - )?; - *wallet_account = updated_wallet; - data_store.update_account(wallet_account.clone()); - - details.success_count += 1; - details.last_tx_id = Some(tx_id); - - // Increment the expected counter value - let new_expected = expected_counter_value.fetch_add(1, Ordering::Relaxed) + 1; - - Ok(new_expected) -} +// COUNTER TRACKING SERVICE +// ================================================================================================ -/// Re-sync the wallet account from the RPC after repeated failures. -#[instrument( - parent = None, - target = COMPONENT, - name = "network_monitor.counter.try_resync_wallet_account", - skip_all, - fields(account.id = %wallet_account.id()), - level = "warn", - err, -)] -async fn try_resync_wallet_account( - rpc_client: &mut RpcClient, - wallet_account: &mut Account, - data_store: &mut MonitorDataStore, -) -> Result<()> { - let fresh_account = fetch_wallet_account(rpc_client, wallet_account.id()) - .await - .inspect_err(|e| { - error!(account.id = %wallet_account.id(), err = ?e, "failed to re-sync wallet account from RPC"); - })? - .context("wallet account not found on-chain during re-sync") - .inspect_err(|e| { - error!(account.id = %wallet_account.id(), err = ?e, "wallet account not found on-chain during re-sync"); - })?; - - info!(account.id = %wallet_account.id(), "wallet account re-synced from RPC"); - *wallet_account = fresh_account; - data_store.update_account(wallet_account.clone()); - Ok(()) +/// Periodically fetches the counter value and reports how far the observed value trails the +/// expected value. +pub struct CounterTrackingService { + config: MonitorConfig, + rpc_client: RpcClient, + counter_account: Account, + details: CounterTrackingDetails, + expected_counter_value: Arc, + latency_state: Arc>, } -/// Regenerate accounts from scratch when re-sync is ineffective. -/// -/// Creates fresh wallet and counter accounts, deploys them to the network, and re-initializes -/// the increment task state. This is a last resort after [`REGENERATE_FAILURE_THRESHOLD`] -/// consecutive failures. -#[instrument( - parent = None, - target = COMPONENT, - name = "network_monitor.counter.try_regenerate_accounts", - skip_all, - level = "warn", - err, -)] -async fn try_regenerate_accounts( - config: &MonitorConfig, - rpc_client: &mut RpcClient, -) -> Result<( - IncrementDetails, - Account, - Account, - BlockHeader, - MonitorDataStore, - NoteScript, - SecretKey, -)> { - crate::deploy::force_recreate_accounts( - &config.wallet_filepath, - &config.counter_filepath, - &config.rpc_url, - ) - .await - .context("failed to regenerate accounts")?; - - // Re-initialize the full task state from the newly-created account files. - let state = setup_increment_task(config.clone(), rpc_client).await?; - - info!("account regeneration completed, increment task re-initialized"); - Ok(state) -} +impl CounterTrackingService { + pub async fn new( + config: MonitorConfig, + expected_counter_value: Arc, + latency_state: Arc>, + ) -> Result { + let mut rpc_client = + create_genesis_aware_rpc_client(&config.rpc_url, config.request_timeout).await?; + let counter_account = load_counter_account(&config.counter_filepath) + .context("Failed to load counter account")?; + + let mut details = CounterTrackingDetails::default(); + initialize_tracking_state( + &mut rpc_client, + &counter_account, + &expected_counter_value, + &mut details, + ) + .await; -/// Handle the failure path when creating/submitting the network note fails. -fn handle_increment_failure(details: &mut IncrementDetails, error: &anyhow::Error) -> String { - error!("Failed to create and submit network note: {:?}", error); - details.failure_count += 1; - format!("create/submit note failed: {error}") -} + Ok(Self { + config, + rpc_client, + counter_account, + details, + expected_counter_value, + latency_state, + }) + } -/// Build a `ServiceStatus` snapshot from the current increment details and last error. -fn build_increment_status(details: &IncrementDetails, last_error: Option) -> ServiceStatus { - let service_details = ServiceDetails::NtxIncrement(details.clone()); + /// The increment service regenerates accounts on persistent failure and rewrites the + /// counter account file. If the file's account ID has changed, switch to the new account + /// and reset tracking state. + async fn reload_counter_account_if_changed(&mut self) { + let reloaded = match load_counter_account(&self.config.counter_filepath) { + Ok(account) => account, + Err(e) => { + warn!(err = ?e, "failed to reload counter account file"); + return; + }, + }; - // If the most recent attempt failed, surface the service as unhealthy so the - // dashboard reflects that the increment pipeline is not currently working. - // Also unhealthy if we've never succeeded but have failures. - if let Some(err) = last_error { - ServiceStatus::unhealthy("Local Transactions", err, service_details) - } else if details.success_count == 0 && details.failure_count > 0 { - ServiceStatus::unhealthy( - "Local Transactions", - format!("no successful increments ({} failures)", details.failure_count), - service_details, + if reloaded.id() == self.counter_account.id() { + return; + } + + info!( + old.id = %self.counter_account.id(), + new.id = %reloaded.id(), + "counter account file changed, resetting tracking state", + ); + self.counter_account = reloaded; + self.details = CounterTrackingDetails::default(); + initialize_tracking_state( + &mut self.rpc_client, + &self.counter_account, + &self.expected_counter_value, + &mut self.details, ) - } else { - ServiceStatus::healthy("Local Transactions", service_details) + .await; } -} -/// Send the status update, bailing on error. -fn send_status(tx: &watch::Sender, status: ServiceStatus) -> Result<()> { - if tx.send(status).is_err() { - error!("Failed to send counter increment status update"); - anyhow::bail!("Failed to send counter increment status update") + /// Poll the counter once, updating details and latency tracking state. + async fn poll_counter_once(&mut self) -> Option { + let mut last_error = None; + let current_time = current_unix_timestamp_secs(); + + match fetch_counter_value(&mut self.rpc_client, self.counter_account.id()).await { + Ok(Some(value)) => { + self.details.current_value = Some(value); + self.details.last_updated = Some(current_time); + + update_expected_and_pending(&mut self.details, &self.expected_counter_value, value); + self.handle_latency_tracking(value, &mut last_error).await; + }, + Ok(None) => { + // Counter value not available, but not an error + }, + Err(e) => { + error!("Failed to fetch counter value: {:?}", e); + last_error = Some(format!("fetch counter value failed: {e}")); + }, + } + + last_error + } + + /// Update latency tracking state, performing RPC as needed while minimizing lock hold time. + async fn handle_latency_tracking( + &mut self, + observed_value: u64, + last_error: &mut Option, + ) { + let (pending, pending_started) = { + let guard = self.latency_state.lock().await; + (guard.pending.clone(), guard.pending_started) + }; + + let Some(pending) = pending else { + return; + }; + + if observed_value >= pending.target_value { + match fetch_chain_tip(&mut self.rpc_client).await { + Ok(observed_height) => { + let latency_blocks = observed_height.saturating_sub(pending.submit_height); + let mut guard = self.latency_state.lock().await; + if guard.pending.as_ref().map(|p| p.target_value) == Some(pending.target_value) + { + guard.last_latency_blocks = Some(latency_blocks); + guard.pending = None; + guard.pending_started = None; + } + }, + Err(e) => { + *last_error = Some(format!("Failed to fetch chain tip for latency calc: {e}")); + }, + } + } else if let Some(started) = pending_started { + if Instant::now().saturating_duration_since(started) + >= self.config.counter_latency_timeout + { + warn!( + "Latency measurement timed out after {:?} for target value {}", + self.config.counter_latency_timeout, pending.target_value + ); + let mut guard = self.latency_state.lock().await; + if guard.pending.as_ref().map(|p| p.target_value) == Some(pending.target_value) { + guard.pending = None; + guard.pending_started = None; + } + *last_error = Some(format!( + "Timed out after {:?} waiting for counter to reach {}", + self.config.counter_latency_timeout, pending.target_value + )); + } + } } - Ok(()) } -/// Run the counter tracking task. -/// -/// This function periodically fetches the current counter value from the network -/// and updates the tracking details. -/// -/// # Arguments -/// -/// * `config` - The monitor configuration containing file paths and intervals. -/// * `tx` - The watch channel sender for status updates. -/// * `expected_counter_value` - Shared atomic counter for tracking expected value based on -/// successful increments. -/// -/// # Returns -/// -/// This function runs indefinitely, only returning on error. -pub async fn run_counter_tracking_task( - config: MonitorConfig, - tx: watch::Sender, - expected_counter_value: Arc, - latency_state: Arc>, -) -> Result<()> { - // Create RPC client - let mut rpc_client = - create_genesis_aware_rpc_client(&config.rpc_url, config.request_timeout).await?; - - // Load counter account to get the account ID - let mut counter_account = match load_counter_account(&config.counter_filepath) { - Ok(account) => account, - Err(e) => { - error!("Failed to load counter account: {:?}", e); - anyhow::bail!("Failed to load counter account: {e}") - }, - }; +impl Service for CounterTrackingService { + fn name(&self) -> &'static str { + "Network Transactions" + } - let mut details = CounterTrackingDetails::default(); - initialize_counter_tracking_state( - &mut rpc_client, - &counter_account, - &expected_counter_value, - &mut details, - ) - .await; - - let mut poll_interval = tokio::time::interval(config.counter_increment_interval / 2); - - loop { - poll_interval.tick().await; - - // The increment task may regenerate accounts when doesn't fixes the card, reload from - // the account file so tracking follows the new account. - reload_counter_account_if_changed( - &config, - &mut counter_account, - &mut rpc_client, - &expected_counter_value, - &mut details, - ) - .await; + fn interval(&self) -> Duration { + // Tracking polls twice per increment cadence so it catches freshly-incremented values + // soon after submission. + self.config.counter_increment_interval / 2 + } - let last_error = poll_counter_once( - &mut rpc_client, - &counter_account, - &expected_counter_value, - &latency_state, - &mut details, - &config, - ) - .await; - let status = build_tracking_status(&details, last_error); - send_status(&tx, status)?; + fn initial_status(&self) -> ServiceStatus { + ServiceStatus::unknown(self.name(), ServiceDetails::NtxTracking(self.details.clone())) + } + + async fn check(&mut self) -> ServiceStatus { + self.reload_counter_account_if_changed().await; + let last_error = self.poll_counter_once().await; + build_tracking_status(&self.details, last_error) } } -/// Reload the counter account from disk and re-initialize tracking state if its ID changed. -/// -/// The increment task regenerates accounts on persistent failure and rewrites the account -/// file. Without this the tracking task would keep polling the stale account ID forever. -async fn reload_counter_account_if_changed( - config: &MonitorConfig, - counter_account: &mut Account, +// SETUP +// ================================================================================================ + +/// Load wallet + counter accounts, fetch the genesis block header, and build the data store +/// and increment script needed to produce network notes. +async fn setup_increment_task( + config: MonitorConfig, rpc_client: &mut RpcClient, - expected_counter_value: &Arc, - details: &mut CounterTrackingDetails, -) { - let reloaded = match load_counter_account(&config.counter_filepath) { - Ok(account) => account, - Err(e) => { - warn!(err = ?e, "failed to reload counter account file"); - return; - }, +) -> Result<(TxBuilder, IncrementDetails)> { + let wallet_account_file = + AccountFile::read(config.wallet_filepath).context("Failed to read wallet account file")?; + let wallet_account = fetch_wallet_account(rpc_client, wallet_account_file.account.id()) + .await? + .unwrap_or(wallet_account_file.account.clone()); + + let AuthSecretKey::Falcon512Poseidon2(secret_key) = wallet_account_file + .auth_secret_keys + .first() + .expect("wallet account file should have one auth secret key") + .clone() + else { + anyhow::bail!("Failed to load wallet account, auth secret key not found") }; - if reloaded.id() == counter_account.id() { - return; - } + let counter_account = load_counter_account(&config.counter_filepath) + .inspect_err(|e| error!("Failed to load counter account: {:?}", e))?; - info!( - old.id = %counter_account.id(), - new.id = %reloaded.id(), - "counter account file changed, resetting tracking state", - ); - *counter_account = reloaded; - *details = CounterTrackingDetails::default(); - initialize_counter_tracking_state(rpc_client, counter_account, expected_counter_value, details) - .await; + let block_header = get_genesis_block_header(rpc_client).await?; + + let increment_script = create_increment_script()?; + + let mut data_store = MonitorDataStore::new(block_header.clone(), PartialBlockchain::default()); + data_store.add_account(wallet_account.clone()); + data_store.add_account(counter_account.clone()); + + let tx = TxBuilder { + wallet_account, + counter_account, + secret_key, + increment_script, + data_store, + block_header, + rng: ChaCha20Rng::from_os_rng(), + }; + + Ok((tx, IncrementDetails::default())) } /// Initialize tracking state by fetching the current counter value from the node. -/// -/// Populates `expected_counter_value` and seeds `details` with the latest observed -/// values so the first poll iteration starts from a consistent snapshot. -async fn initialize_counter_tracking_state( +async fn initialize_tracking_state( rpc_client: &mut RpcClient, counter_account: &Account, expected_counter_value: &Arc, @@ -773,40 +612,40 @@ async fn initialize_counter_tracking_state( } } -/// Poll the counter once, updating details and latency tracking state. -/// -/// Returns a human-readable error string when the poll fails or latency tracking -/// cannot complete; otherwise returns `None`. -async fn poll_counter_once( - rpc_client: &mut RpcClient, - counter_account: &Account, - expected_counter_value: &Arc, - latency_state: &Arc>, - details: &mut CounterTrackingDetails, - config: &MonitorConfig, -) -> Option { - let mut last_error = None; - let current_time = current_unix_timestamp_secs(); +// STATUS BUILDERS +// ================================================================================================ - match fetch_counter_value(rpc_client, counter_account.id()).await { - Ok(Some(value)) => { - details.current_value = Some(value); - details.last_updated = Some(current_time); +/// Build a `ServiceStatus` snapshot from the current increment details and last error. +fn build_increment_status(details: &IncrementDetails, last_error: Option) -> ServiceStatus { + let service_details = ServiceDetails::NtxIncrement(details.clone()); - update_expected_and_pending(details, expected_counter_value, value); - handle_latency_tracking(rpc_client, latency_state, config, value, &mut last_error) - .await; - }, - Ok(None) => { - // Counter value not available, but not an error - }, - Err(e) => { - error!("Failed to fetch counter value: {:?}", e); - last_error = Some(format!("fetch counter value failed: {e}")); - }, + if let Some(err) = last_error { + ServiceStatus::unhealthy("Local Transactions", err, service_details) + } else if details.success_count == 0 && details.failure_count > 0 { + ServiceStatus::unhealthy( + "Local Transactions", + format!("no successful increments ({} failures)", details.failure_count), + service_details, + ) + } else { + ServiceStatus::healthy("Local Transactions", service_details) } +} - last_error +/// Build a `ServiceStatus` snapshot from the current tracking details and last error. +fn build_tracking_status( + details: &CounterTrackingDetails, + last_error: Option, +) -> ServiceStatus { + let service_details = ServiceDetails::NtxTracking(details.clone()); + + if let Some(err) = last_error { + ServiceStatus::unhealthy("Network Transactions", err, service_details) + } else if details.current_value.is_some() { + ServiceStatus::healthy("Network Transactions", service_details) + } else { + ServiceStatus::unknown("Network Transactions", service_details) + } } /// Update expected and pending counters based on the latest observed value. @@ -829,159 +668,253 @@ fn update_expected_and_pending( } } -/// Update latency tracking state, performing RPC as needed while minimizing lock hold time. +// RPC HELPERS +// ================================================================================================ + +/// Get the genesis block header. +async fn get_genesis_block_header(rpc_client: &mut RpcClient) -> Result { + let block_header_request = BlockHeaderByNumberRequest { + block_num: Some(BlockNumber::GENESIS.as_u32()), + include_mmr_proof: None, + }; + + let response = rpc_client + .get_block_header_by_number(block_header_request) + .await + .context("Failed to get genesis block header from RPC")? + .into_inner(); + + let genesis_block_header = response + .block_header + .ok_or_else(|| anyhow::anyhow!("No block header in response"))?; + + let block_header: BlockHeader = + genesis_block_header.try_into().context("Failed to convert block header")?; + + Ok(block_header) +} + +/// Fetch the storage header of the given account from RPC. /// -/// Populates `last_error` when latency bookkeeping fails or times out. -async fn handle_latency_tracking( +/// Returns `None` if the account does not exist or has no details available. +async fn fetch_account_storage_header( rpc_client: &mut RpcClient, - latency_state: &Arc>, - config: &MonitorConfig, - observed_value: u64, - last_error: &mut Option, -) { - let (pending, pending_started) = { - let guard = latency_state.lock().await; - (guard.pending.clone(), guard.pending_started) + account_id: AccountId, +) -> Result> { + let request = build_account_request(account_id, false); + let resp = rpc_client.get_account(request).await?.into_inner(); + + let Some(details) = resp.details else { + return Ok(None); }; - if let Some(pending) = pending { - if observed_value >= pending.target_value { - match fetch_chain_tip(rpc_client).await { - Ok(observed_height) => { - let latency_blocks = observed_height.saturating_sub(pending.submit_height); - let mut guard = latency_state.lock().await; - if guard.pending.as_ref().map(|p| p.target_value) == Some(pending.target_value) - { - guard.last_latency_blocks = Some(latency_blocks); - guard.pending = None; - guard.pending_started = None; - } - }, - Err(e) => { - *last_error = Some(format!("Failed to fetch chain tip for latency calc: {e}")); - }, - } - } else if let Some(started) = pending_started { - if Instant::now().saturating_duration_since(started) >= config.counter_latency_timeout { - warn!( - "Latency measurement timed out after {:?} for target value {}", - config.counter_latency_timeout, pending.target_value - ); - let mut guard = latency_state.lock().await; - if guard.pending.as_ref().map(|p| p.target_value) == Some(pending.target_value) { - guard.pending = None; - guard.pending_started = None; - } - *last_error = Some(format!( - "Timed out after {:?} waiting for counter to reach {}", - config.counter_latency_timeout, pending.target_value - )); - } - } - } + let storage_details = details.storage_details.context("missing storage details")?; + let storage_header = storage_details.header.context("missing storage header")?; + + Ok(Some(storage_header)) } -/// Build a `ServiceStatus` snapshot from the current tracking details and last error. -fn build_tracking_status( - details: &CounterTrackingDetails, - last_error: Option, -) -> ServiceStatus { - let service_details = ServiceDetails::NtxTracking(details.clone()); +/// Fetch the latest nonce of the given account from RPC. +async fn fetch_counter_value( + rpc_client: &mut RpcClient, + account_id: AccountId, +) -> Result> { + let Some(storage_header) = fetch_account_storage_header(rpc_client, account_id).await? else { + return Ok(None); + }; - // If the latest poll failed, surface the service as unhealthy even if we have - // a previously cached value, so the dashboard shows that tracking is degraded. - if let Some(err) = last_error { - ServiceStatus::unhealthy("Network Transactions", err, service_details) - } else if details.current_value.is_some() { - ServiceStatus::healthy("Network Transactions", service_details) - } else { - ServiceStatus::unknown("Network Transactions", service_details) - } + let counter_slot = storage_header + .slots + .iter() + .find(|slot| slot.slot_name == COUNTER_SLOT_NAME.as_str()) + .context(format!("counter slot '{}' not found", COUNTER_SLOT_NAME.as_str()))?; + + // The counter value is stored as a Word, with the actual u64 value in the first element + let slot_value: Word = counter_slot + .commitment + .as_ref() + .context("missing storage slot value")? + .try_into() + .context("failed to convert slot value to word")?; + + let value = slot_value + .as_elements() + .first() + .expect("Word has 4 elements") + .as_canonical_u64(); + + Ok(Some(value)) } -/// Load counter account from file. -fn load_counter_account(file_path: &Path) -> Result { - let account_file = - AccountFile::read(file_path).context("Failed to read counter account file")?; +/// Build an account request for the given account ID. +/// +/// If `include_code_and_vault` is true, uses dummy commitments to force the server +/// to return code and vault data (server only returns data when our commitment differs). +fn build_account_request( + account_id: AccountId, + include_code_and_vault: bool, +) -> miden_node_proto::generated::rpc::AccountRequest { + let id_bytes: [u8; 15] = account_id.into(); + let account_id_proto = + miden_node_proto::generated::account::AccountId { id: id_bytes.to_vec() }; - Ok(account_file.account.clone()) + let (code_commitment, asset_vault_commitment) = if include_code_and_vault { + let dummy: miden_node_proto::generated::primitives::Digest = Word::default().into(); + (Some(dummy), Some(dummy)) + } else { + (None, None) + }; + + miden_node_proto::generated::rpc::AccountRequest { + account_id: Some(account_id_proto), + block_num: None, + details: Some(miden_node_proto::generated::rpc::account_request::AccountDetailRequest { + code_commitment, + asset_vault_commitment, + storage_maps: vec![], + }), + } } -/// Create and submit a network note that targets the counter account. -#[expect(clippy::too_many_arguments)] -#[instrument( - parent = None, - target = COMPONENT, - name = "network_monitor.counter.create_and_submit_network_note", - skip_all, - level = "info", - ret(level = "debug"), - err -)] -async fn create_and_submit_network_note( - wallet_account: &Account, - counter_account: &Account, - secret_key: &SecretKey, +/// Fetch an account from RPC and reconstruct the full Account. +/// +/// Uses dummy commitments to force the server to return all data (code, vault, storage header). +/// Only supports accounts with value slots; returns an error if storage maps are present. +async fn fetch_wallet_account( rpc_client: &mut RpcClient, - data_store: &MonitorDataStore, - block_header: &BlockHeader, - increment_script: &NoteScript, - rng: &mut ChaCha20Rng, -) -> Result<(String, AccountHeader, BlockNumber)> { - // Create authenticator for transaction signing - let authenticator = - BasicAuthenticator::new(&[AuthSecretKey::Falcon512Poseidon2(secret_key.clone())]); + account_id: AccountId, +) -> Result> { + let request = build_account_request(account_id, true); - let account_interface = AccountInterface::from_account(wallet_account); + let response = match rpc_client.get_account(request).await { + Ok(response) => response.into_inner(), + Err(e) => { + warn!(account.id = %account_id, err = %e, "failed to fetch wallet account via RPC"); + return Ok(None); + }, + }; - let (network_note, note_recipient) = - create_network_note(wallet_account, counter_account, increment_script.clone(), rng)?; - let script = account_interface.build_send_notes_script(&[network_note.into()], None)?; + let Some(details) = response.details else { + if response.witness.is_some() { + info!( + account.id = %account_id, + "account found on-chain but cannot reconstruct full account from RPC response" + ); + } + return Ok(None); + }; - // Create transaction executor - let executor = TransactionExecutor::new(data_store).with_authenticator(&authenticator); + let header = details.header.context("missing account header")?; + let nonce: u64 = header.nonce; - // Execute the transaction with the network note - let mut tx_args = TransactionArgs::default().with_tx_script(script); - tx_args.add_output_note_recipient(Box::new(note_recipient)); + let code = details + .code + .map(|code_bytes| AccountCode::read_from_bytes(&code_bytes)) + .transpose() + .context("failed to deserialize account code")? + .context("server did not return account code")?; - let executed_tx = Box::pin(executor.execute_transaction( - wallet_account.id(), - block_header.block_num(), - InputNotes::default(), - tx_args, - )) - .await - .context("Failed to execute transaction")?; + let vault = match details.vault_details { + Some(vault_details) if vault_details.too_many_assets => { + anyhow::bail!("account {account_id} has too many assets, cannot fetch full account"); + }, + Some(vault_details) => { + let assets: Vec = vault_details + .assets + .into_iter() + .map(TryInto::try_into) + .collect::>() + .context("failed to convert assets")?; + AssetVault::new(&assets).context("failed to create vault")? + }, + None => anyhow::bail!("server did not return asset vault for account {account_id}"), + }; - let tx_inputs = executed_tx.tx_inputs().to_bytes(); + let storage_details = details.storage_details.context("missing storage details")?; + let storage = build_account_storage(storage_details)?; - let final_account = executed_tx.final_account().clone(); + let account = Account::new(account_id, vault, storage, code, Felt::new(nonce), None) + .context("failed to create account")?; - // Prove the transaction - let prover = LocalTransactionProver::default(); - let proven_tx = prover.prove(executed_tx).await.context("Failed to prove transaction")?; + // Sanity check: verify reconstructed account matches header commitments + let expected_code_commitment: Word = header + .code_commitment + .context("missing code commitment in header")? + .try_into() + .context("invalid code commitment")?; + let expected_vault_root: Word = header + .vault_root + .context("missing vault root in header")? + .try_into() + .context("invalid vault root")?; + let expected_storage_commitment: Word = header + .storage_commitment + .context("missing storage commitment in header")? + .try_into() + .context("invalid storage commitment")?; - // Submit the proven transaction - let request = ProvenTransaction { - transaction: proven_tx.to_bytes(), - transaction_inputs: Some(tx_inputs), - }; + anyhow::ensure!( + account.code().commitment() == expected_code_commitment, + "code commitment mismatch: rebuilt={:?}, expected={:?}", + account.code().commitment(), + expected_code_commitment + ); + anyhow::ensure!( + account.vault().root() == expected_vault_root, + "vault root mismatch: rebuilt={:?}, expected={:?}", + account.vault().root(), + expected_vault_root + ); + anyhow::ensure!( + account.storage().to_commitment() == expected_storage_commitment, + "storage commitment mismatch: rebuilt={:?}, expected={:?}", + account.storage().to_commitment(), + expected_storage_commitment + ); - let block_height: BlockNumber = rpc_client - .submit_proven_transaction(request) - .await - .context("Failed to submit proven transaction to RPC")? - .into_inner() - .block_num - .into(); + info!(account.id = %account_id, "fetched wallet account from RPC"); + Ok(Some(account)) +} - info!("Submitted proven transaction to RPC"); +/// Build account storage from the storage details returned by the server. +/// +/// This function only supports accounts with value slots. If any storage map slots +/// are encountered, an error is returned since the monitor only uses simple accounts. +fn build_account_storage( + storage_details: miden_node_proto::generated::rpc::AccountStorageDetails, +) -> Result { + use miden_protocol::account::{AccountStorage, StorageSlot}; + + let storage_header = storage_details.header.context("missing storage header")?; - // Use the transaction ID from the proven transaction - let tx_id = proven_tx.id().to_hex(); + let mut slots = Vec::new(); + for slot in storage_header.slots { + let slot_name = miden_protocol::account::StorageSlotName::new(slot.slot_name.clone()) + .context("invalid slot name")?; + let value: Word = slot + .commitment + .context("missing slot value")? + .try_into() + .context("invalid slot value")?; - Ok((tx_id, final_account, block_height)) + // slot_type: 0 = Value, 1 = Map + anyhow::ensure!( + slot.slot_type == 0, + "storage map slots are not supported for this account" + ); + + slots.push(StorageSlot::with_value(slot_name, value)); + } + + AccountStorage::new(slots).context("failed to create account storage") +} + +/// Load counter account from file. +fn load_counter_account(file_path: &Path) -> Result { + let account_file = + AccountFile::read(file_path).context("Failed to read counter account file")?; + + Ok(account_file.account.clone()) } /// Create the increment procedure script. @@ -993,7 +926,6 @@ fn create_increment_script() -> Result { .with_linked_module("external_contract::counter_contract", script) .context("Failed to create script builder with library")?; - // Compile the script directly as a NoteScript let note_script = script_builder .compile_note_script(include_str!(concat!( env!("CARGO_MANIFEST_DIR"), @@ -1011,8 +943,6 @@ fn create_network_note( script: NoteScript, rng: &mut ChaCha20Rng, ) -> Result<(Note, NoteRecipient)> { - // Create the NetworkAccountTarget attachment - this is required for the note to be - // recognized as a network note by the ntx-builder let target = NetworkAccountTarget::new(counter_account.id(), NoteExecutionHint::Always) .context("Failed to create NetworkAccountTarget for counter account")?; let attachment: NoteAttachment = target.into(); diff --git a/bin/network-monitor/src/explorer.rs b/bin/network-monitor/src/explorer.rs index 3d3499980..cc0d2439a 100644 --- a/bin/network-monitor/src/explorer.rs +++ b/bin/network-monitor/src/explorer.rs @@ -6,12 +6,11 @@ use std::time::Duration; use reqwest::Client; use serde::Serialize; -use tokio::sync::watch; -use tokio::time::MissedTickBehavior; -use tracing::{info, instrument}; +use tracing::instrument; use url::Url; use crate::COMPONENT; +use crate::service::Service; use crate::status::{ExplorerStatusDetails, ServiceDetails, ServiceStatus}; const LATEST_BLOCK_QUERY: &str = " @@ -48,103 +47,69 @@ const LATEST_BLOCK_REQUEST: GraphqlRequest = GraphqlRequest { variables: EmptyVariables, }; -/// Runs a task that continuously checks explorer status and updates a watch channel. -/// -/// This function spawns a task that periodically checks the explorer service status -/// and sends updates through a watch channel. -/// -/// # Arguments -/// -/// * `explorer_url` - The URL of the explorer service. -/// * `name` - The name of the explorer. -/// * `status_sender` - The sender for the watch channel. -/// * `status_check_interval` - The interval at which to check the status of the services. -/// -/// # Returns -/// -/// `Ok(())` if the monitoring task runs and completes successfully, or an error if there are -/// connection issues or failures while checking the explorer status. -pub async fn run_explorer_status_task( - explorer_url: Url, - name: String, - status_sender: watch::Sender, - status_check_interval: Duration, +pub struct ExplorerService { + url: Url, + client: Client, + interval: Duration, request_timeout: Duration, -) { - let mut explorer_client = reqwest::Client::new(); - - let mut interval = tokio::time::interval(status_check_interval); - interval.set_missed_tick_behavior(MissedTickBehavior::Skip); - - loop { - interval.tick().await; +} - let status = check_explorer_status( - &mut explorer_client, - explorer_url.clone(), - name.clone(), +impl ExplorerService { + pub fn new(url: Url, interval: Duration, request_timeout: Duration) -> Self { + Self { + url, + client: reqwest::Client::new(), + interval, request_timeout, - ) - .await; - - // Send the status update; exit if no receivers (shutdown signal) - if status_sender.send(status).is_err() { - info!("No receivers for explorer status updates, shutting down"); - return; } } } -/// Checks the status of the explorer service. -/// -/// This function checks the status of the explorer service. -/// -/// # GraphQL Query -/// -/// See [`LATEST_BLOCK_QUERY`] for the exact query string used. -/// -/// # Arguments -/// -/// * `explorer` - The explorer client. -/// * `name` - The name of the explorer. -/// * `url` - The URL of the explorer. -/// * `current_time` - The current time. -/// -/// # Returns -/// -/// A `ServiceStatus` containing the status of the explorer service. -#[instrument(target = COMPONENT, name = "check-status.explorer", skip_all, ret(level = "info"))] -pub(crate) async fn check_explorer_status( - explorer_client: &mut Client, - explorer_url: Url, - name: String, - request_timeout: Duration, -) -> ServiceStatus { - let resp = explorer_client - .post(explorer_url.clone()) - .json(&LATEST_BLOCK_REQUEST) - .timeout(request_timeout) - .send() - .await; - - let body = match resp { - Ok(resp) => match resp.text().await { - Ok(body) => body, - Err(e) => return ServiceStatus::error(&name, e), - }, - Err(e) => return ServiceStatus::error(&name, e), - }; - - let value: serde_json::Value = match serde_json::from_str(&body) { - Ok(value) => value, - Err(e) => { - return ServiceStatus::error(&name, format!("{e}: {body}")); - }, - }; - - match ExplorerStatusDetails::try_from(value) { - Ok(details) => ServiceStatus::healthy(name, ServiceDetails::ExplorerStatus(details)), - Err(e) => ServiceStatus::error(&name, e), +impl Service for ExplorerService { + fn name(&self) -> &'static str { + "Explorer" + } + + fn interval(&self) -> Duration { + self.interval + } + + fn initial_status(&self) -> ServiceStatus { + ServiceStatus::unknown( + self.name(), + ServiceDetails::ExplorerStatus(ExplorerStatusDetails::default()), + ) + } + + #[instrument(target = COMPONENT, name = "check-status.explorer", skip_all, ret(level = "info"))] + async fn check(&mut self) -> ServiceStatus { + let resp = self + .client + .post(self.url.clone()) + .json(&LATEST_BLOCK_REQUEST) + .timeout(self.request_timeout) + .send() + .await; + + let body = match resp { + Ok(resp) => match resp.text().await { + Ok(body) => body, + Err(e) => return ServiceStatus::error(self.name(), e), + }, + Err(e) => return ServiceStatus::error(self.name(), e), + }; + + let value: serde_json::Value = match serde_json::from_str(&body) { + Ok(value) => value, + Err(e) => return ServiceStatus::error(self.name(), format!("{e}: {body}")), + }; + + match ExplorerStatusDetails::try_from(value) { + Ok(details) => { + ServiceStatus::healthy(self.name(), ServiceDetails::ExplorerStatus(details)) + }, + Err(e) => ServiceStatus::error(self.name(), e), + } } } @@ -247,13 +212,6 @@ impl TryFrom for ExplorerStatusDetails { } } -pub(crate) fn initial_explorer_status() -> ServiceStatus { - ServiceStatus::unknown( - "Explorer", - ServiceDetails::ExplorerStatus(ExplorerStatusDetails::default()), - ) -} - // TESTS // ================================================================================================ diff --git a/bin/network-monitor/src/faucet.rs b/bin/network-monitor/src/faucet.rs index 75c223a9a..4f40af00c 100644 --- a/bin/network-monitor/src/faucet.rs +++ b/bin/network-monitor/src/faucet.rs @@ -12,12 +12,11 @@ use miden_protocol::testing::account_id::ACCOUNT_ID_SENDER; use reqwest::Client; use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; -use tokio::sync::watch; -use tokio::time::MissedTickBehavior; use tracing::{debug, info, instrument, warn}; use url::Url; use crate::COMPONENT; +use crate::service::Service; use crate::status::{ServiceDetails, ServiceStatus}; // CONSTANTS @@ -74,79 +73,87 @@ pub struct GetMetadataResponse { // FAUCET TEST TASK // ================================================================================================ -/// Runs a task that continuously tests faucet functionality and updates a watch channel. -/// -/// This function spawns a task that periodically requests proof-of-work challenges from the faucet, -/// solves them, and submits token requests to verify the faucet is operational. -/// -/// # Arguments -/// -/// * `faucet_url` - The URL of the faucet service to test. -/// * `status_sender` - The sender for the watch channel. -/// * `test_interval` - The interval at which to test the faucet services. -/// -/// # Returns -/// -/// `Ok(())` if the task completes successfully, or an error if the task fails. -pub async fn run_faucet_test_task( - faucet_url: Url, - status_sender: watch::Sender, - test_interval: Duration, - request_timeout: Duration, -) { - let client = Client::builder() - .timeout(request_timeout) - .build() - .expect("Failed to create HTTP client with timeout"); - let mut success_count = 0u64; - let mut failure_count = 0u64; - let mut last_tx_id = None; - let mut last_error: Option; - let mut faucet_metadata = None; - - let mut interval = tokio::time::interval(test_interval); - interval.set_missed_tick_behavior(MissedTickBehavior::Skip); - - loop { - interval.tick().await; +pub struct FaucetService { + url: Url, + client: Client, + interval: Duration, + success_count: u64, + failure_count: u64, + last_tx_id: Option, + faucet_metadata: Option, +} + +impl FaucetService { + pub fn new(url: Url, interval: Duration, request_timeout: Duration) -> Self { + let client = Client::builder() + .timeout(request_timeout) + .build() + .expect("Failed to create HTTP client with timeout"); + Self { + url, + client, + interval, + success_count: 0, + failure_count: 0, + last_tx_id: None, + faucet_metadata: None, + } + } +} + +impl Service for FaucetService { + fn name(&self) -> &'static str { + "Faucet" + } + + fn interval(&self) -> Duration { + self.interval + } + + fn initial_status(&self) -> ServiceStatus { + ServiceStatus::unknown( + self.name(), + ServiceDetails::FaucetTest(FaucetTestDetails { + url: self.url.to_string(), + test_duration_ms: 0, + success_count: 0, + failure_count: 0, + last_tx_id: None, + faucet_metadata: None, + }), + ) + } + async fn check(&mut self) -> ServiceStatus { let start_time = std::time::Instant::now(); + let mut last_error: Option = None; - match perform_faucet_test(&client, &faucet_url).await { + match perform_faucet_test(&self.client, &self.url).await { Ok((minted_tokens, metadata)) => { - success_count += 1; - last_tx_id = Some(minted_tokens.tx_id.clone()); - last_error = None; - faucet_metadata = Some(metadata); + self.success_count += 1; + self.last_tx_id = Some(minted_tokens.tx_id.clone()); + self.faucet_metadata = Some(metadata); info!("Faucet test successful: tx_id={}", minted_tokens.tx_id); }, Err(e) => { - failure_count += 1; + self.failure_count += 1; last_error = Some(format!("{e:#}")); warn!("Faucet test failed: {}", e); }, } - let test_duration_ms = start_time.elapsed().as_millis() as u64; - let details = ServiceDetails::FaucetTest(FaucetTestDetails { - url: faucet_url.to_string(), - test_duration_ms, - success_count, - failure_count, - last_tx_id: last_tx_id.clone(), - faucet_metadata: faucet_metadata.clone(), + url: self.url.to_string(), + test_duration_ms: start_time.elapsed().as_millis() as u64, + success_count: self.success_count, + failure_count: self.failure_count, + last_tx_id: self.last_tx_id.clone(), + faucet_metadata: self.faucet_metadata.clone(), }); - let status = match &last_error { - Some(err) => ServiceStatus::unhealthy("Faucet", err, details), - None => ServiceStatus::healthy("Faucet", details), - }; - - // Send the status update; exit if no receivers (shutdown signal) - if status_sender.send(status).is_err() { - info!("No receivers for faucet status updates, shutting down"); - return; + match last_error { + Some(err) => ServiceStatus::unhealthy(self.name(), err, details), + None => ServiceStatus::healthy(self.name(), details), } } } diff --git a/bin/network-monitor/src/main.rs b/bin/network-monitor/src/main.rs index e0dba4b51..0290b5316 100644 --- a/bin/network-monitor/src/main.rs +++ b/bin/network-monitor/src/main.rs @@ -18,6 +18,7 @@ pub mod frontend; mod monitor; pub mod note_transport; pub mod remote_prover; +pub mod service; pub mod service_status; pub mod status; pub mod validator; diff --git a/bin/network-monitor/src/monitor/tasks.rs b/bin/network-monitor/src/monitor/tasks.rs index 422d85b30..b22c71f94 100644 --- a/bin/network-monitor/src/monitor/tasks.rs +++ b/bin/network-monitor/src/monitor/tasks.rs @@ -5,45 +5,24 @@ use std::sync::Arc; use std::sync::atomic::AtomicU64; use anyhow::Result; -use miden_node_proto::clients::{ - Builder as ClientBuilder, - RemoteProverProxyStatusClient, - RpcClient, -}; +use miden_node_proto::clients::RemoteProverClient; use tokio::sync::watch::Receiver; use tokio::sync::{Mutex, watch}; use tokio::task::{Id, JoinSet}; -use tracing::{debug, instrument}; -use url::Url; +use tracing::{debug, error}; use crate::COMPONENT; use crate::config::MonitorConfig; -use crate::counter::{LatencyState, run_counter_tracking_task, run_increment_task}; +use crate::counter::{CounterTrackingService, IncrementService, LatencyState}; use crate::deploy::ensure_accounts_exist; -use crate::explorer::{initial_explorer_status, run_explorer_status_task}; -use crate::faucet::{FaucetTestDetails, run_faucet_test_task}; +use crate::explorer::ExplorerService; +use crate::faucet::FaucetService; use crate::frontend::{ServerState, serve}; -use crate::note_transport::{initial_note_transport_status, run_note_transport_status_task}; -use crate::remote_prover::{ - ProofType, - generate_prover_test_payload, - merge_prover, - run_prover_combiner, - run_remote_prover_test_task, -}; -use crate::status::{ - CounterTrackingDetails, - IncrementDetails, - ServiceDetails, - ServiceStatus, - StaleChainTracker, - check_remote_prover_status, - check_rpc_status, - current_unix_timestamp_secs, - run_remote_prover_status_task, - run_rpc_status_task, -}; -use crate::validator::{initial_validator_status, run_validator_status_task}; +use crate::note_transport::NoteTransportService; +use crate::remote_prover::{ProbeSnapshot, ProverStatusService, generate_prover_test_payload}; +use crate::service::{Service, build_tls_client}; +use crate::status::{RpcService, ServiceStatus}; +use crate::validator::ValidatorService; /// Task management structure that encapsulates `JoinSet` and component names. #[derive(Default)] @@ -62,347 +41,95 @@ impl Tasks { } /// Spawn the RPC status checker task. - #[instrument( - parent = None, - target = COMPONENT, - name = "network_monitor.tasks.spawn_rpc_checker", - skip_all, - level = "info", - ret(level = "debug"), - err - )] - pub async fn spawn_rpc_checker( - &mut self, - config: &MonitorConfig, - ) -> Result> { - debug!(target: COMPONENT, rpc_url = %config.rpc_url, "Spawning RPC status checker task"); - - // Create initial status for RPC service - let mut rpc = ClientBuilder::new(config.rpc_url.clone()) - .with_tls() - .expect("TLS is enabled") - .with_timeout(config.request_timeout) - .without_metadata_version() - .without_metadata_genesis() - .without_otel_context_injection() - .connect_lazy::(); - - let current_time = current_unix_timestamp_secs(); - let mut stale_tracker = StaleChainTracker::new(config.stale_chain_tip_threshold); - let initial_rpc_status = check_rpc_status( - &mut rpc, - config.rpc_url.to_string(), - current_time, - &mut stale_tracker, - ) - .await; - - // Spawn the RPC checker - let (rpc_tx, rpc_rx) = watch::channel(initial_rpc_status); - let rpc_url = config.rpc_url.clone(); - let status_check_interval = config.status_check_interval; - let request_timeout = config.request_timeout; - let stale_chain_tip_threshold = config.stale_chain_tip_threshold; - let id = self - .handles - .spawn(async move { - run_rpc_status_task( - rpc_url, - rpc_tx, - status_check_interval, - request_timeout, - stale_chain_tip_threshold, - ) - .await; - }) - .id(); - self.names.insert(id, "rpc-checker".to_string()); - - debug!(target: COMPONENT, "RPC status checker task spawned successfully"); - Ok(rpc_rx) + pub fn spawn_rpc_checker(&mut self, config: &MonitorConfig) -> Receiver { + let svc = RpcService::new( + config.rpc_url.clone(), + config.status_check_interval, + config.request_timeout, + config.stale_chain_tip_threshold, + ); + self.spawn_service(svc) } /// Spawn the explorer status checker task. - #[instrument(target = COMPONENT, name = "tasks.spawn-explorer-checker", skip_all)] - pub async fn spawn_explorer_checker( - &mut self, - config: &MonitorConfig, - ) -> Result> { + pub fn spawn_explorer_checker(&mut self, config: &MonitorConfig) -> Receiver { let explorer_url = config.explorer_url.clone().expect("Explorer URL exists"); - let name = "Explorer".to_string(); - let status_check_interval = config.status_check_interval; - let request_timeout = config.request_timeout; - let (explorer_status_tx, explorer_status_rx) = watch::channel(initial_explorer_status()); - - let id = self - .handles - .spawn(async move { - run_explorer_status_task( - explorer_url, - name, - explorer_status_tx, - status_check_interval, - request_timeout, - ) - .await; - }) - .id(); - self.names.insert(id, "explorer-checker".to_string()); - - println!("Spawned explorer status checker task"); - - Ok(explorer_status_rx) + let svc = ExplorerService::new( + explorer_url, + config.status_check_interval, + config.request_timeout, + ); + self.spawn_service(svc) } /// Spawn the note transport status checker task. - #[instrument(target = COMPONENT, name = "tasks.spawn-note-transport-checker", skip_all)] - pub async fn spawn_note_transport_checker( + pub fn spawn_note_transport_checker( &mut self, config: &MonitorConfig, - ) -> Result> { + ) -> Receiver { let note_transport_url = config.note_transport_url.clone().expect("Note transport URL exists"); - let name = "Note Transport".to_string(); - let status_check_interval = config.status_check_interval; - let request_timeout = config.request_timeout; - let (tx, rx) = watch::channel(initial_note_transport_status()); - - let id = self - .handles - .spawn(async move { - run_note_transport_status_task( - note_transport_url, - name, - tx, - status_check_interval, - request_timeout, - ) - .await; - }) - .id(); - self.names.insert(id, "note-transport-checker".to_string()); - - println!("Spawned note transport status checker task"); - - Ok(rx) + let svc = NoteTransportService::new( + note_transport_url, + config.status_check_interval, + config.request_timeout, + ); + self.spawn_service(svc) } /// Spawn the validator status checker task. - #[instrument(target = COMPONENT, name = "tasks.spawn-validator-checker", skip_all)] - pub async fn spawn_validator_checker( - &mut self, - config: &MonitorConfig, - ) -> Result> { + pub fn spawn_validator_checker(&mut self, config: &MonitorConfig) -> Receiver { let validator_url = config.validator_url.clone().expect("Validator URL exists"); - let name = "Validator".to_string(); - let status_check_interval = config.status_check_interval; - let request_timeout = config.request_timeout; - let (tx, rx) = watch::channel(initial_validator_status()); - - let id = self - .handles - .spawn(async move { - run_validator_status_task( - validator_url, - name, - tx, - status_check_interval, - request_timeout, - ) - .await; - }) - .id(); - self.names.insert(id, "validator-checker".to_string()); - - println!("Spawned validator status checker task"); - - Ok(rx) + let svc = ValidatorService::new( + validator_url, + config.status_check_interval, + config.request_timeout, + ); + self.spawn_service(svc) } - /// Spawn prover status and test tasks for all configured provers. - #[instrument( - parent = None, - target = COMPONENT, - name = "network_monitor.tasks.spawn_prover_tasks", - skip_all, - level = "info", - ret(level = "debug"), - err - )] + /// Spawn prover status tasks for all configured provers. + /// + /// Each prover is monitored by a [`ProverStatusService`] that polls on the status cadence. + /// The first time it observes the prover reporting `ProofType::Transaction`, the status + /// service spawns a detached probe task that runs proof-test probes on the test cadence. pub async fn spawn_prover_tasks( &mut self, config: &MonitorConfig, - ) -> Result>> { - debug!(target: COMPONENT, prover_count = config.remote_prover_urls.len(), "Spawning prover tasks"); + ) -> Vec> { let mut prover_rxs = Vec::new(); for (i, prover_url) in config.remote_prover_urls.iter().enumerate() { - prover_rxs.push(self.spawn_single_prover(config, i, prover_url).await); - } - debug!(target: COMPONENT, spawned_provers = prover_rxs.len(), "All prover tasks spawned successfully"); - Ok(prover_rxs) - } - - /// Spawns the status checker, optional test task, and combiner task for a single prover. - /// Returns the receiver of the merged status channel. - async fn spawn_single_prover( - &mut self, - config: &MonitorConfig, - index: usize, - prover_url: &Url, - ) -> watch::Receiver { - let name = format!("Remote Prover ({})", index + 1); - - let mut remote_prover = ClientBuilder::new(prover_url.clone()) - .with_tls() - .expect("TLS is enabled") - .with_timeout(config.request_timeout) - .without_metadata_version() - .without_metadata_genesis() - .without_otel_context_injection() - .connect_lazy::(); - - let initial_prover_status = - check_remote_prover_status(&mut remote_prover, name.clone(), prover_url.to_string()) - .await; - - let (prover_status_tx, prover_status_rx) = watch::channel(initial_prover_status.clone()); - - // Spawn the remote prover status check task - let prover_url_clone = prover_url.clone(); - let name_clone = name.clone(); - let status_check_interval = config.status_check_interval; - let request_timeout = config.request_timeout; - let id = self - .handles - .spawn(async move { - run_remote_prover_status_task( - prover_url_clone, - name_clone, - prover_status_tx, - status_check_interval, - request_timeout, - ) - .await; - }) - .id(); - self.names.insert(id, format!("prover-checker-{}", index + 1)); - - // Extract proof_type directly from the service status. If the prover is not available - // during startup, skip spawning test tasks. - let proof_type = if let ServiceDetails::ProverStatusCheck(details) = - &initial_prover_status.details - { - Some(details.supported_proof_type.clone()) - } else { - tracing::warn!( - "Prover {name} is not available during startup, skipping test task initialization" - ); - None - }; - - // Only spawn test tasks for transaction provers; others get a dummy closed channel. - let prover_test_rx = if matches!(proof_type, Some(ProofType::Transaction)) { - debug!("Starting transaction proof tests for prover: {name}"); + let name = format!("Remote Prover ({})", i + 1); + let (probe_tx, probe_rx) = watch::channel(ProbeSnapshot::default()); + let test_client = + build_tls_client::(prover_url.clone(), config.request_timeout); let payload = generate_prover_test_payload().await; - let (prover_test_tx, prover_test_rx) = watch::channel(initial_prover_status.clone()); - - let prover_url_clone = prover_url.clone(); - let name_clone = name.clone(); - let proof_type = proof_type.expect("proof type is Some"); - let remote_prover_interval = config.remote_prover_test_interval; - let id = self - .handles - .spawn(async move { - run_remote_prover_test_task( - prover_url_clone, - &name_clone, - proof_type, - payload, - prover_test_tx, - request_timeout, - remote_prover_interval, - ) - .await; - }) - .id(); - self.names.insert(id, format!("prover-test-{}", index + 1)); - - prover_test_rx - } else { - debug!( - "Skipping prover tests for {name} (supports {proof_type:?} proofs, only testing Transaction proofs)" + let status_svc = ProverStatusService::new( + name, + prover_url.clone(), + config.status_check_interval, + config.request_timeout, + config.remote_prover_test_interval, + probe_tx, + probe_rx, + test_client, + payload, ); - let (_tx, rx) = watch::channel(initial_prover_status.clone()); - rx - }; - - // Spawn a combiner task that merges the status and test receivers into a single - // `ServiceStatus` channel, which is what `ServerState` consumes. - let initial_merged = merge_prover(&prover_status_rx.borrow(), &prover_test_rx.borrow()); - let (merged_tx, merged_rx) = watch::channel(initial_merged); - let id = self - .handles - .spawn(async move { - run_prover_combiner(prover_status_rx, prover_test_rx, merged_tx).await; - }) - .id(); - self.names.insert(id, format!("prover-combiner-{}", index + 1)); - - merged_rx + prover_rxs.push(self.spawn_service(status_svc)); + } + prover_rxs } /// Spawn the faucet testing task. - #[instrument( - parent = None, - target = COMPONENT, - name = "network_monitor.tasks.spawn_faucet", - skip_all, - level = "info", - ret(level = "debug") - )] pub fn spawn_faucet(&mut self, config: &MonitorConfig) -> Receiver { - // Create initial faucet test status - let initial_faucet_status = ServiceStatus::unknown( - "Faucet", - ServiceDetails::FaucetTest(FaucetTestDetails { - url: config.faucet_url.as_ref().expect("faucet URL exists").to_string(), - test_duration_ms: 0, - success_count: 0, - failure_count: 0, - last_tx_id: None, - faucet_metadata: None, - }), - ); - - // Spawn the faucet testing task - let (faucet_tx, faucet_rx) = watch::channel(initial_faucet_status); - // SAFETY: config.faucet_url is Some - let faucet_url = config.faucet_url.clone().unwrap(); - let faucet_test_interval = config.faucet_test_interval; - let request_timeout = config.request_timeout; - let id = self - .handles - .spawn(async move { - run_faucet_test_task(faucet_url, faucet_tx, faucet_test_interval, request_timeout) - .await; - }) - .id(); - self.names.insert(id, "faucet-test".to_string()); - - faucet_rx + let faucet_url = config.faucet_url.clone().expect("faucet URL exists"); + let svc = + FaucetService::new(faucet_url, config.faucet_test_interval, config.request_timeout); + self.spawn_service(svc) } /// Spawn the network transaction service checker task. - #[instrument( - parent = None, - target = COMPONENT, - name = "network_monitor.tasks.spawn_ntx_service", - skip_all, - level = "info", - ret(level = "debug"), - err - )] pub async fn spawn_ntx_service( &mut self, config: &MonitorConfig, @@ -414,117 +141,70 @@ impl Tasks { // Create shared atomic counter for tracking expected counter value let expected_counter_value = Arc::new(AtomicU64::new(0)); let latency_state = Arc::new(Mutex::new(LatencyState::default())); - let latency_state_for_increment = latency_state.clone(); - let latency_state_for_tracking = latency_state.clone(); - // Create initial increment status - let initial_increment_status = ServiceStatus::unknown( - "Local Transactions", - ServiceDetails::NtxIncrement(IncrementDetails { - success_count: 0, - failure_count: 0, - last_tx_id: None, - last_latency_blocks: None, - }), - ); + let increment_svc = IncrementService::new( + config.clone(), + Arc::clone(&expected_counter_value), + latency_state.clone(), + ) + .await?; + let tracking_svc = CounterTrackingService::new( + config.clone(), + Arc::clone(&expected_counter_value), + latency_state, + ) + .await?; - // Create initial tracking status - let initial_tracking_status = ServiceStatus::unknown( - "Network Transactions", - ServiceDetails::NtxTracking(CounterTrackingDetails { - current_value: None, - expected_value: None, - last_updated: None, - pending_increments: None, - }), - ); + let increment_rx = self.spawn_service(increment_svc); + let tracking_rx = self.spawn_service(tracking_svc); - // Spawn the increment task - let (increment_tx, increment_rx) = watch::channel(initial_increment_status); - let config_clone = config.clone(); - let counter_clone = Arc::clone(&expected_counter_value); - let increment_id = self - .handles - .spawn(async move { - Box::pin(run_increment_task( - config_clone, - increment_tx, - counter_clone, - latency_state_for_increment, - )) - .await - .expect("Counter increment task runs indefinitely"); - }) - .id(); - self.names.insert(increment_id, "counter-increment".to_string()); + Ok((increment_rx, tracking_rx)) + } - // Spawn the tracking task - let (tracking_tx, tracking_rx) = watch::channel(initial_tracking_status); - let config_clone = config.clone(); - let counter_clone = Arc::clone(&expected_counter_value); - let tracking_id = self + /// Spawns a [`Service`] and returns its `ServiceStatus` receiver. + /// + /// Seeds the `watch::channel` from [`Service::initial_status`] and hands the sender to + /// [`Service::run`] in a new task. The returned receiver is what [`ServerState`] consumes. + pub fn spawn_service(&mut self, svc: S) -> Receiver { + let (tx, rx) = watch::channel(svc.initial_status()); + let service_name = svc.name().to_string(); + let id = self .handles - .spawn(async move { - Box::pin(run_counter_tracking_task( - config_clone, - tracking_tx, - counter_clone, - latency_state_for_tracking, - )) - .await - .expect("Counter tracking task runs indefinitely"); + .spawn({ + let service_name = service_name.clone(); + async move { + if let Err(e) = svc.run(tx).await { + error!(target: COMPONENT, service = %service_name, err = ?e, "service exited with error"); + } + } }) .id(); - self.names.insert(tracking_id, "counter-tracking".to_string()); - - Ok((increment_rx, tracking_rx)) + debug!(target: COMPONENT, service = %service_name, "spawned service"); + self.names.insert(id, service_name); + rx } /// Spawn the HTTP frontend server. - #[instrument( - parent = None, - target = COMPONENT, - name = "network_monitor.tasks.spawn_http_server", - skip_all, - level = "info", - ret(level = "debug") - )] pub fn spawn_http_server(&mut self, server_state: ServerState, config: &MonitorConfig) { let config = config.clone(); let id = self.handles.spawn(async move { serve(server_state, config).await }).id(); self.names.insert(id, "frontend".to_string()); } - /// Wait for any task to complete or fail and return the result. - async fn join_next_with_id(&mut self) -> Option> { - self.handles.join_next_with_id().await - } - - /// Get the component name for a given task ID. - fn get_component_name(&self, id: Id) -> Option<&String> { - self.names.get(&id) - } - /// Handles the failure of a task. /// - /// This method waits for any task to complete or fail and returns an error. - /// Since we expect components to run indefinitely, any task completion is treated as fatal. - /// - /// # Returns - /// - /// An error if any task fails or completes unexpectedly. + /// Waits for any task to complete or fail and returns an error. Since components are + /// expected to run indefinitely, any task completion is treated as fatal. pub async fn handle_failure(&mut self) -> Result<()> { - // Wait for any task to complete or fail - let component_result = self.join_next_with_id().await.expect("join set is not empty"); + let component_result = + self.handles.join_next_with_id().await.expect("join set is not empty"); - // We expect components to run indefinitely, so we treat any return as fatal. let (id, err) = match component_result { Ok((id, ())) => (id, anyhow::anyhow!("component completed unexpectedly")), Err(join_err) => (join_err.id(), anyhow::Error::from(join_err)), }; - let component_name = self.get_component_name(id).map_or("unknown", String::as_str); + let component_name = self.names.get(&id).map_or("unknown", String::as_str); - // Exit with error context Err(err.context(format!("component {component_name} failed"))) } } diff --git a/bin/network-monitor/src/note_transport.rs b/bin/network-monitor/src/note_transport.rs index 7fc618243..24ad4b5c5 100644 --- a/bin/network-monitor/src/note_transport.rs +++ b/bin/network-monitor/src/note_transport.rs @@ -3,97 +3,88 @@ use std::time::Duration; -use tokio::sync::watch; -use tokio::time::MissedTickBehavior; use tonic::transport::{Channel, ClientTlsConfig}; use tonic_health::pb::health_client::HealthClient; use tonic_health::pb::{HealthCheckRequest, health_check_response}; -use tracing::{info, instrument}; +use tracing::instrument; use url::Url; use crate::COMPONENT; +use crate::service::Service; use crate::status::{NoteTransportStatusDetails, ServiceDetails, ServiceStatus}; -/// Creates a `tonic` channel for the given URL, enabling TLS for `https` schemes. -fn create_channel(url: &Url, timeout: Duration) -> Result { - let mut endpoint = Channel::from_shared(url.to_string()).expect("valid URL").timeout(timeout); +pub struct NoteTransportService { + url: Url, + client: HealthClient, + interval: Duration, +} - if url.scheme() == "https" { - endpoint = endpoint.tls_config(ClientTlsConfig::new().with_native_roots())?; +impl NoteTransportService { + pub fn new(url: Url, interval: Duration, timeout: Duration) -> Self { + let channel = create_channel(&url, timeout).expect("failed to create channel"); + let client = HealthClient::new(channel); + Self { url, client, interval } } - - Ok(endpoint.connect_lazy()) } -/// Runs a task that continuously checks note transport health and updates a watch channel. -pub async fn run_note_transport_status_task( - url: Url, - name: String, - status_sender: watch::Sender, - status_check_interval: Duration, - request_timeout: Duration, -) { - let channel = create_channel(&url, request_timeout).expect("failed to create channel"); - let mut health_client = HealthClient::new(channel); - - let mut interval = tokio::time::interval(status_check_interval); - interval.set_missed_tick_behavior(MissedTickBehavior::Skip); - - loop { - interval.tick().await; - - let status = - check_note_transport_status(&mut health_client, url.to_string(), name.clone()).await; +impl Service for NoteTransportService { + fn name(&self) -> &'static str { + "Note Transport" + } - if status_sender.send(status).is_err() { - info!("No receivers for note transport status updates, shutting down"); - return; - } + fn interval(&self) -> Duration { + self.interval } -} -/// Checks the health of the note transport service via the standard gRPC Health Checking Protocol. -#[instrument( - target = COMPONENT, - name = "check-status.note-transport", - skip_all, - ret(level = "info") -)] -pub(crate) async fn check_note_transport_status( - health_client: &mut HealthClient, - url: String, - name: String, -) -> ServiceStatus { - let request = HealthCheckRequest { service: String::new() }; + fn initial_status(&self) -> ServiceStatus { + ServiceStatus::unknown( + self.name(), + ServiceDetails::NoteTransportStatus(NoteTransportStatusDetails::default()), + ) + } - match health_client.check(request).await { - Ok(response) => { - let serving_status = response.into_inner().status(); - let is_serving = serving_status == health_check_response::ServingStatus::Serving; - let serving_status_str = format!("{serving_status:?}"); + #[instrument( + target = COMPONENT, + name = "check-status.note-transport", + skip_all, + ret(level = "info") + )] + async fn check(&mut self) -> ServiceStatus { + let request = HealthCheckRequest { service: String::new() }; + let url = self.url.to_string(); - if is_serving { - let details = ServiceDetails::NoteTransportStatus(NoteTransportStatusDetails { - url, - serving_status: serving_status_str, - }); - ServiceStatus::healthy(name, details) - } else { - let error = format!("serving status: {serving_status_str}"); + match self.client.check(request).await { + Ok(response) => { + let serving_status = response.into_inner().status(); + let is_serving = serving_status == health_check_response::ServingStatus::Serving; + let serving_status_str = format!("{serving_status:?}"); let details = ServiceDetails::NoteTransportStatus(NoteTransportStatusDetails { url, - serving_status: serving_status_str, + serving_status: serving_status_str.clone(), }); - ServiceStatus::unhealthy(name, error, details) - } - }, - Err(e) => ServiceStatus::error(name, e), + + if is_serving { + ServiceStatus::healthy(self.name(), details) + } else { + ServiceStatus::unhealthy( + self.name(), + format!("serving status: {serving_status_str}"), + details, + ) + } + }, + Err(e) => ServiceStatus::error(self.name(), e), + } } } -pub(crate) fn initial_note_transport_status() -> ServiceStatus { - ServiceStatus::unknown( - "Note Transport", - ServiceDetails::NoteTransportStatus(NoteTransportStatusDetails::default()), - ) +/// Creates a `tonic` channel for the given URL, enabling TLS for `https` schemes. +fn create_channel(url: &Url, timeout: Duration) -> Result { + let mut endpoint = Channel::from_shared(url.to_string()).expect("valid URL").timeout(timeout); + + if url.scheme() == "https" { + endpoint = endpoint.tls_config(ClientTlsConfig::new().with_native_roots())?; + } + + Ok(endpoint.connect_lazy()) } diff --git a/bin/network-monitor/src/remote_prover.rs b/bin/network-monitor/src/remote_prover.rs index 795dd3397..3370d6307 100644 --- a/bin/network-monitor/src/remote_prover.rs +++ b/bin/network-monitor/src/remote_prover.rs @@ -1,12 +1,16 @@ -//! Remote transaction prover test functionality. +//! Remote prover monitoring: status polling and proof-test probing. //! -//! This module contains the logic for periodically testing remote transaction prover functionality -//! by sending mock transactions and checking for successful transaction proof generation. +//! A prover is monitored by up to two tasks: +//! - [`ProverStatusService`] (impl [`Service`]): polls the proxy status endpoint on the status +//! cadence and publishes the public [`ServiceStatus`] by merging in the latest probe outcome. +//! - [`run_prover_test`] (spawned lazily by the status service): runs proof-test probes on the +//! longer test cadence and publishes a private [`ProbeSnapshot`]. Only spawned the first time the +//! status service observes the prover reporting [`ProofType::Transaction`]. -use std::time::Duration; +use std::time::{Duration, Instant}; use anyhow::Context; -use miden_node_proto::clients::{Builder as ClientBuilder, RemoteProverClient}; +use miden_node_proto::clients::{RemoteProverClient, RemoteProverProxyStatusClient}; use miden_node_proto::generated as proto; use miden_protocol::account::auth::AuthScheme; use miden_protocol::asset::{Asset, FungibleAsset}; @@ -19,13 +23,15 @@ use serde::{Deserialize, Serialize}; use tokio::sync::watch; use tokio::time::MissedTickBehavior; use tonic::Request; -use tracing::{info, instrument}; +use tracing::{debug, instrument}; use url::Url; use crate::COMPONENT; -use crate::status::{ +use crate::service::{Service, build_tls_client}; +use crate::service_status::{ ProverTestOutcome, RemoteProverDetails, + RemoteProverStatusDetails, ServiceDetails, ServiceStatus, Status, @@ -75,157 +81,260 @@ pub struct ProverTestDetails { pub proof_type: ProofType, } -// REMOTE TRANSACTION PROVER TEST TASK +// PROBE SNAPSHOT // ================================================================================================ -/// Runs a task that continuously tests remote prover functionality and updates a watch channel. -/// -/// This function spawns a task that periodically sends mock request payloads to a remote prover -/// and measures the success/failure rate and performance metrics for proof generation. -/// -/// # Arguments -/// -/// * `prover_url` - The URL of the remote prover service to test. -/// * `name` - The name of the remote prover. -/// * `proof_type` - The type of proof to test. -/// * `serialized_request_payload` - The serialized request payload to send to the remote prover. -/// * `status_sender` - The sender for the watch channel. -/// -/// # Returns -/// -/// `Ok(())` if the task completes successfully, or an error if the task fails. -pub async fn run_remote_prover_test_task( - prover_url: Url, - name: &str, - proof_type: ProofType, - serialized_request_payload: proto::remote_prover::ProofRequest, - status_sender: watch::Sender, - request_timeout: Duration, - test_interval: Duration, -) { - let mut client = ClientBuilder::new(prover_url) - .with_tls() - .expect("TLS is enabled") - .with_timeout(request_timeout) - .without_metadata_version() - .without_metadata_genesis() - .without_otel_context_injection() - .connect_lazy::(); +/// Private snapshot of the most recent probe result. Shared from the probe task to the status +/// service via a `watch` channel. +#[derive(Debug, Clone, Default)] +pub struct ProbeSnapshot { + pub latest: Option, + pub success_count: u64, + pub failure_count: u64, +} - let mut interval = tokio::time::interval(test_interval); - interval.set_missed_tick_behavior(MissedTickBehavior::Skip); +// PROVER STATUS SERVICE +// ================================================================================================ - let mut success_count = 0u64; - let mut failure_count = 0u64; +/// Parameters captured at construction time for spawning the probe task lazily, the first +/// time the status service observes the prover reporting [`ProofType::Transaction`]. +struct ProbeSpawner { + client: RemoteProverClient, + payload: proto::remote_prover::ProofRequest, + interval: Duration, + probe_tx: watch::Sender, + name: String, +} - loop { - interval.tick().await; +/// Polls the remote prover's proxy status endpoint and publishes the combined +/// [`ServiceStatus`] (status + latest probe outcome). Spawns the probe task the first +/// time the prover reports Transaction type. +pub struct ProverStatusService { + name: String, + url: String, + client: RemoteProverProxyStatusClient, + interval: Duration, + last_status: Option, + last_status_err: Option, + probe_rx: watch::Receiver, + probe_spawner: Option, +} - let status = test_remote_prover( - &mut client, +impl ProverStatusService { + #[expect(clippy::too_many_arguments)] + pub fn new( + name: String, + prover_url: Url, + interval: Duration, + request_timeout: Duration, + probe_interval: Duration, + probe_tx: watch::Sender, + probe_rx: watch::Receiver, + test_client: RemoteProverClient, + payload: proto::remote_prover::ProofRequest, + ) -> Self { + let url = prover_url.to_string(); + let client = build_tls_client::(prover_url, request_timeout); + let probe_spawner = Some(ProbeSpawner { + client: test_client, + payload, + interval: probe_interval, + probe_tx, + name: name.clone(), + }); + Self { name, - &proof_type, - &serialized_request_payload, - &mut success_count, - &mut failure_count, - ) - .await; + url, + client, + interval, + last_status: None, + last_status_err: None, + probe_rx, + probe_spawner, + } + } - // Send the status update; exit if no receivers (shutdown signal) - if status_sender.send(status).is_err() { - info!("No receivers for remote prover status updates, shutting down"); + /// Spawns the probe task if the prover has just been observed to support Transaction + /// proofs and we haven't spawned it yet. No-op in all other cases. + fn maybe_spawn_probe(&mut self) { + let Some(status) = &self.last_status else { return }; + if !matches!(status.supported_proof_type, ProofType::Transaction) { return; } + let Some(spawner) = self.probe_spawner.take() else { + return; + }; + debug!(target: COMPONENT, prover = %self.name, "spawning probe task"); + tokio::spawn(run_prover_test( + spawner.client, + spawner.payload, + spawner.interval, + spawner.probe_tx, + spawner.name, + )); + } + + /// Classifies the current status + probe state into a [`ServiceStatus`]. + fn build_status(&self, probe: &ProbeSnapshot) -> ServiceStatus { + let Some(status_details) = self.last_status.clone() else { + let msg = self.last_status_err.clone().unwrap_or_else(|| "discovering".to_string()); + let mut status = ServiceStatus::unknown(&self.name, ServiceDetails::Error); + status.error = Some(msg); + return status; + }; + + let details = ServiceDetails::RemoteProverStatus(RemoteProverDetails { + status: status_details.clone(), + test: probe.latest.clone(), + }); + + // Most recent status poll failed — report unhealthy but keep last known status details. + if let Some(err) = &self.last_status_err { + return ServiceStatus::unhealthy(&self.name, err.clone(), details); + } + + if let Some(outcome) = &probe.latest { + if outcome.status == Status::Unhealthy { + let msg = outcome.error.clone().unwrap_or_else(|| "prover test failed".to_string()); + return ServiceStatus::unhealthy(&self.name, msg, details); + } + } + + let unhealthy_workers: Vec<_> = status_details + .workers + .iter() + .filter(|w| w.status != Status::Healthy) + .map(|w| w.name.clone()) + .collect(); + + if status_details.workers.is_empty() { + ServiceStatus::unknown(&self.name, details) + } else if !unhealthy_workers.is_empty() { + ServiceStatus::unhealthy( + &self.name, + format!("unhealthy workers: {}", unhealthy_workers.join(", ")), + details, + ) + } else { + ServiceStatus::healthy(&self.name, details) + } } } -/// Tests the remote prover by sending a mock request payload. -/// -/// This function sends a mock request payload to the remote prover and measures the response time -/// and success/failure rate for proof generation. -/// -/// # Arguments -/// -/// * `client` - The remote prover gRPC client. -/// * `name` - The name of the remote prover. -/// * `proof_type` - The type of proof to test. -/// * `serialized_request_payload` - The serialized request payload to send to the remote prover. -/// * `success_count` - Mutable reference to the success counter. -/// * `failure_count` - Mutable reference to the failure counter. -/// -/// # Returns -/// -/// A `ServiceStatus` containing the results of the proof test. +impl Service for ProverStatusService { + fn name(&self) -> &str { + &self.name + } + + fn interval(&self) -> Duration { + self.interval + } + + fn initial_status(&self) -> ServiceStatus { + self.build_status(&ProbeSnapshot::default()) + } + + #[instrument( + parent = None, + target = COMPONENT, + name = "network_monitor.prover.status_check", + skip_all, + level = "info", + ret(level = "debug"), + fields(prover = %self.name) + )] + async fn check(&mut self) -> ServiceStatus { + match self.client.status(()).await { + Ok(response) => { + self.last_status = Some(RemoteProverStatusDetails::from_proxy_status( + response.into_inner(), + self.url.clone(), + )); + self.last_status_err = None; + }, + Err(e) => { + debug!(target: COMPONENT, prover = %self.name, error = %e, "Remote prover status check failed"); + self.last_status_err = Some(e.to_string()); + }, + } + self.maybe_spawn_probe(); + let probe = self.probe_rx.borrow().clone(); + self.build_status(&probe) + } +} + +// PROBE TASK +// ================================================================================================ + +/// Runs proof-test probes on the configured interval. The task is spawned by +/// [`ProverStatusService::maybe_spawn_probe`] only after the prover has been observed to +/// support Transaction proofs. #[instrument( parent = None, target = COMPONENT, - name = "network_monitor.remote_prover.test_remote_prover", + name = "network_monitor.prover.run_test", skip_all, level = "info", - ret(level = "debug") + fields(prover = %name), )] -async fn test_remote_prover( - client: &mut miden_node_proto::clients::RemoteProverClient, - name: &str, - proof_type: &ProofType, - serialized_request_payload: &proto::remote_prover::ProofRequest, - success_count: &mut u64, - failure_count: &mut u64, -) -> ServiceStatus { - let start_time = std::time::Instant::now(); - - // Create the proof request - let request = Request::new(serialized_request_payload.clone()); - - // Send the request and measure the time - match client.prove(request).await { - Ok(response) => { - let duration = start_time.elapsed(); - let response_inner = response.into_inner(); - - *success_count += 1; - - ServiceStatus::healthy( - name, - ServiceDetails::ProverTestResult(ProverTestDetails { - test_duration_ms: duration.as_millis() as u64, - proof_size_bytes: response_inner.payload.len(), - success_count: *success_count, - failure_count: *failure_count, - proof_type: proof_type.clone(), - }), - ) - }, - Err(e) => { - *failure_count += 1; +async fn run_prover_test( + mut client: RemoteProverClient, + payload: proto::remote_prover::ProofRequest, + interval: Duration, + probe_tx: watch::Sender, + name: String, +) { + let mut timer = tokio::time::interval(interval); + timer.set_missed_tick_behavior(MissedTickBehavior::Skip); + let mut state = ProbeSnapshot::default(); - ServiceStatus::unhealthy( - name, - tonic_status_to_json(&e), - ServiceDetails::ProverTestResult(ProverTestDetails { - test_duration_ms: 0, - proof_size_bytes: 0, - success_count: *success_count, - failure_count: *failure_count, - proof_type: proof_type.clone(), - }), - ) - }, + loop { + timer.tick().await; + + let start = Instant::now(); + let request = Request::new(payload.clone()); + match client.prove(request).await { + Ok(response) => { + state.success_count += 1; + state.latest = Some(ProverTestOutcome { + details: ProverTestDetails { + test_duration_ms: start.elapsed().as_millis() as u64, + proof_size_bytes: response.into_inner().payload.len(), + success_count: state.success_count, + failure_count: state.failure_count, + proof_type: ProofType::Transaction, + }, + status: Status::Healthy, + error: None, + }); + }, + Err(e) => { + state.failure_count += 1; + state.latest = Some(ProverTestOutcome { + details: ProverTestDetails { + test_duration_ms: 0, + proof_size_bytes: 0, + success_count: state.success_count, + failure_count: state.failure_count, + proof_type: ProofType::Transaction, + }, + status: Status::Unhealthy, + error: Some(tonic_status_to_json(&e)), + }); + }, + } + + if probe_tx.send(state.clone()).is_err() { + debug!(target: COMPONENT, prover = %name, "probe channel closed, exiting probe task"); + return; + } } } +// TONIC STATUS TO JSON +// ================================================================================================ + /// Converts a `tonic::Status` error to a JSON string with structured error information. -/// -/// This function extracts the code, message, details, and metadata from a `tonic::Status` -/// error and serializes them into a JSON string for structured error reporting. -/// -/// # Arguments -/// -/// * `status` - The `tonic::Status` error to convert. -/// -/// # Returns -/// -/// A JSON string containing the structured error information. fn tonic_status_to_json(status: &tonic::Status) -> String { let error_json = serde_json::json!({ "code": format!("{:?}", status.code()), @@ -256,10 +365,6 @@ fn tonic_status_to_json(status: &tonic::Status) -> String { // ================================================================================================ /// Generates a mock transaction for testing remote prover functionality. -/// -/// This function creates a mock transaction using `MockChainBuilder` similar to what's done -/// in the remote prover tests. The transaction is generated once and can be reused for -/// multiple proof test calls. #[instrument( parent = None, target = COMPONENT, @@ -269,17 +374,15 @@ fn tonic_status_to_json(status: &tonic::Status) -> String { ret(level = "debug"), err )] -pub async fn generate_mock_transaction() -> anyhow::Result { +async fn generate_mock_transaction() -> anyhow::Result { let mut mock_chain_builder = MockChainBuilder::new(); - // Create an account with basic authentication let account = mock_chain_builder .add_existing_wallet(Auth::BasicAuth { auth_scheme: AuthScheme::Falcon512Poseidon2, }) .context("Failed to add wallet to mock chain")?; - // Create a fungible asset let fungible_asset: Asset = FungibleAsset::new( ACCOUNT_ID_PUBLIC_FUNGIBLE_FAUCET .try_into() @@ -289,7 +392,6 @@ pub async fn generate_mock_transaction() -> anyhow::Result { .context("Failed to create fungible asset")? .into(); - // Create a P2ID note let note = mock_chain_builder .add_p2id_note( ACCOUNT_ID_SENDER.try_into().context("Failed to convert sender account ID")?, @@ -299,17 +401,14 @@ pub async fn generate_mock_transaction() -> anyhow::Result { ) .context("Failed to add P2ID note")?; - // Build the mock chain let mock_chain = mock_chain_builder.build().context("Failed to build mock chain")?; - // Build transaction context let tx_context = mock_chain .build_tx_context(account.id(), &[note.id()], &[]) .context("Failed to build transaction context")? .build() .context("Failed to build transaction")?; - // Execute the transaction let executed_transaction = tx_context.execute().await.context("Failed to execute transaction")?; Ok(executed_transaction.into()) @@ -332,83 +431,3 @@ pub(crate) async fn generate_prover_test_payload() -> proto::remote_prover::Proo payload: generate_mock_transaction().await.unwrap().to_bytes(), } } - -// PROVER MERGE -// ================================================================================================ - -/// Merges the status and test receivers for a single remote prover into one `ServiceStatus`. -/// -/// The combined status is `Unhealthy` if either the status check or the test failed, `Unknown` -/// if the status checker has not yet seen the prover, and `Healthy` otherwise. The test result -/// is only attached when the test task has produced an actual `ProverTestResult` value (before -/// the first test completes, the test channel holds the initial prover status and should not be -/// surfaced as a test). -pub(crate) fn merge_prover(status: &ServiceStatus, test: &ServiceStatus) -> ServiceStatus { - // If the status checker hasn't produced a real result (prover is down), pass its error - // status straight through. - let ServiceDetails::ProverStatusCheck(status_details) = &status.details else { - return status.clone(); - }; - - // Attach test info only once the test task has produced a real result. - let test_outcome = if let ServiceDetails::ProverTestResult(d) = &test.details { - Some(ProverTestOutcome { - details: d.clone(), - status: test.status.clone(), - }) - } else { - None - }; - let test_unhealthy = test_outcome.as_ref().is_some_and(|t| t.status == Status::Unhealthy); - let test_error = test_outcome.as_ref().and_then(|_| test.error.clone()); - - let details = ServiceDetails::RemoteProverStatus(RemoteProverDetails { - status: status_details.clone(), - test: test_outcome, - }); - - let result = if status.status == Status::Unhealthy || test_unhealthy { - let err = status - .error - .clone() - .or(test_error) - .unwrap_or_else(|| "prover is unhealthy".to_string()); - ServiceStatus::unhealthy(&status.name, err, details) - } else if status.status == Status::Unknown { - ServiceStatus::unknown(&status.name, details) - } else { - ServiceStatus::healthy(&status.name, details) - }; - - result.with_last_checked(status.last_checked) -} - -/// Watches a prover's status and test channels, publishing the merged [`ServiceStatus`] on every -/// upstream change. -/// -/// Exits when the output channel has no receivers or when the upstream status channel's sender is -/// dropped. If the test channel's sender is dropped (e.g. a non-transaction prover with no test -/// task), the combiner falls back to watching the status channel only. -pub(crate) async fn run_prover_combiner( - mut status_rx: watch::Receiver, - mut test_rx: watch::Receiver, - merged_tx: watch::Sender, -) { - let mut test_alive = true; - loop { - let merged = merge_prover(&status_rx.borrow(), &test_rx.borrow()); - if merged_tx.send(merged).is_err() { - info!("No receivers for merged prover status updates, shutting down"); - return; - } - - if test_alive { - tokio::select! { - r = status_rx.changed() => if r.is_err() { return; }, - r = test_rx.changed() => if r.is_err() { test_alive = false; }, - } - } else if status_rx.changed().await.is_err() { - return; - } - } -} diff --git a/bin/network-monitor/src/service.rs b/bin/network-monitor/src/service.rs new file mode 100644 index 000000000..5335b7e19 --- /dev/null +++ b/bin/network-monitor/src/service.rs @@ -0,0 +1,71 @@ +//! Service trait shared by all network monitor checker tasks. +//! +//! Every service (RPC, explorer, faucet, provers, etc.) implements [`Service`]. The default +//! [`Service::run`] gives a standard interval-based check loop with shutdown detection. +//! +//! The [`Tasks::spawn_service`](crate::monitor::tasks::Tasks::spawn_service) helper takes any +//! service, seeds its `watch::channel` with [`Service::initial_status`], spawns the task, and +//! returns the receiver. + +use std::time::Duration; + +use anyhow::Result; +use miden_node_proto::clients::{Builder as ClientBuilder, GrpcClient}; +use tokio::sync::watch; +use tokio::time::MissedTickBehavior; +use tracing::info; +use url::Url; + +use crate::service_status::ServiceStatus; + +/// Build a lazily-connected gRPC client using the network monitor's standard settings +/// (TLS enabled, no metadata, no OTEL propagation). +pub fn build_tls_client(url: Url, timeout: Duration) -> C { + ClientBuilder::new(url) + .with_tls() + .expect("TLS is enabled") + .with_timeout(timeout) + .without_metadata_version() + .without_metadata_genesis() + .without_otel_context_injection() + .connect_lazy::() +} + +/// A monitor checker that periodically produces [`ServiceStatus`] updates. +pub trait Service: Send + 'static { + /// Human-readable service name. + fn name(&self) -> &str; + + /// Interval between [`Self::check`] invocations. + fn interval(&self) -> Duration; + + /// Value used to seed the `watch::channel` at spawn time. + fn initial_status(&self) -> ServiceStatus; + + /// Runs a single check iteration. + fn check(&mut self) -> impl std::future::Future + Send; + + /// Full service lifecycle. The default implementation loops on [`Self::interval`] ticks, + /// calls [`Self::check`], and publishes the result. Returns when the channel has no + /// receivers (clean shutdown). Services with custom scheduling override this. + fn run( + mut self, + tx: watch::Sender, + ) -> impl std::future::Future> + Send + where + Self: Sized, + { + async move { + let mut interval = tokio::time::interval(self.interval()); + interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + loop { + interval.tick().await; + let status = self.check().await; + if tx.send(status).is_err() { + info!("No receivers for {}, shutting down", self.name()); + return Ok(()); + } + } + } + } +} diff --git a/bin/network-monitor/src/service_status.rs b/bin/network-monitor/src/service_status.rs index 65281cb11..4863b857c 100644 --- a/bin/network-monitor/src/service_status.rs +++ b/bin/network-monitor/src/service_status.rs @@ -129,12 +129,7 @@ impl ServiceStatus { #[derive(Debug, Clone, Serialize, Deserialize)] pub enum ServiceDetails { RpcStatus(RpcStatusDetails), - /// Remote prover status combined with its most recent test result. RemoteProverStatus(RemoteProverDetails), - /// Internal: raw output of a remote prover status check task. - ProverStatusCheck(RemoteProverStatusDetails), - /// Internal: raw output of a remote prover test task. - ProverTestResult(ProverTestDetails), FaucetTest(FaucetTestDetails), NtxIncrement(IncrementDetails), NtxTracking(CounterTrackingDetails), @@ -156,6 +151,8 @@ pub struct RemoteProverDetails { pub struct ProverTestOutcome { pub details: ProverTestDetails, pub status: Status, + /// Error message from the most recent test attempt; `None` when the last attempt succeeded. + pub error: Option, } /// Details of the increment service. diff --git a/bin/network-monitor/src/status.rs b/bin/network-monitor/src/status.rs index d0a6724d8..1be65ce45 100644 --- a/bin/network-monitor/src/status.rs +++ b/bin/network-monitor/src/status.rs @@ -7,17 +7,12 @@ use std::time::Duration; -use miden_node_proto::clients::{ - Builder as ClientBuilder, - RemoteProverProxyStatusClient, - RpcClient, -}; -use tokio::sync::watch; -use tokio::time::MissedTickBehavior; -use tracing::{debug, info, instrument}; +use miden_node_proto::clients::RpcClient; +use tracing::{debug, instrument}; use url::Url; use crate::COMPONENT; +use crate::service::{Service, build_tls_client}; pub use crate::service_status::*; // STALE CHAIN TIP TRACKER @@ -75,243 +70,95 @@ impl StaleChainTracker { // RPC STATUS CHECKER // ================================================================================================ -/// Runs a task that continuously checks RPC status and updates a watch channel. -/// -/// This function spawns a task that periodically checks the RPC service status -/// and sends updates through a watch channel. It also detects stale chain tips -/// and marks the RPC as unhealthy if the chain tip hasn't changed for longer -/// than the configured threshold. -/// -/// # Arguments -/// -/// * `rpc_url` - The URL of the RPC service. -/// * `status_sender` - The sender for the watch channel. -/// * `status_check_interval` - The interval at which to check the status of the services. -/// * `request_timeout` - The timeout for outgoing requests. -/// * `stale_chain_tip_threshold` - Maximum time without a chain tip update before marking as -/// unhealthy. -/// -/// # Returns -/// -/// `Ok(())` if the task completes successfully, or an error if the task fails. -pub async fn run_rpc_status_task( - rpc_url: Url, - status_sender: watch::Sender, - status_check_interval: Duration, - request_timeout: Duration, - stale_chain_tip_threshold: Duration, -) { - let url_str = rpc_url.to_string(); - let mut rpc = ClientBuilder::new(rpc_url) - .with_tls() - .expect("TLS is enabled") - .with_timeout(request_timeout) - .without_metadata_version() - .without_metadata_genesis() - .without_otel_context_injection() - .connect_lazy::(); - - let mut interval = tokio::time::interval(status_check_interval); - interval.set_missed_tick_behavior(MissedTickBehavior::Skip); - - let mut stale_tracker = StaleChainTracker::new(stale_chain_tip_threshold); - - loop { - interval.tick().await; - - let current_time = current_unix_timestamp_secs(); - - let status = - check_rpc_status(&mut rpc, url_str.clone(), current_time, &mut stale_tracker).await; +pub struct RpcService { + url: String, + rpc: RpcClient, + stale_tracker: StaleChainTracker, + interval: Duration, +} - // Send the status update; exit if no receivers (shutdown signal) - if status_sender.send(status).is_err() { - info!("No receivers for RPC status updates, shutting down"); - return; +impl RpcService { + pub fn new( + rpc_url: Url, + interval: Duration, + request_timeout: Duration, + stale_threshold: Duration, + ) -> Self { + let url = rpc_url.to_string(); + let rpc = build_tls_client::(rpc_url, request_timeout); + Self { + url, + rpc, + stale_tracker: StaleChainTracker::new(stale_threshold), + interval, } } } -/// Checks the status of the RPC service. -/// -/// This function checks the status of the RPC service and detects stale chain tips. -/// If the chain tip hasn't changed for longer than the configured threshold, the RPC -/// is marked as unhealthy. -/// -/// # Arguments -/// -/// * `rpc` - The RPC client. -/// * `url` - The URL of the RPC service. -/// * `current_time` - The current time. -/// * `stale_tracker` - Tracker for detecting stale chain tips. -/// -/// # Returns -/// -/// A `ServiceStatus` containing the status of the RPC service. -#[instrument( - parent = None, - target = COMPONENT, - name = "network_monitor.status.check_rpc_status", - skip_all, - level = "info", - ret(level = "debug") -)] -pub(crate) async fn check_rpc_status( - rpc: &mut miden_node_proto::clients::RpcClient, - url: String, - current_time: u64, - stale_tracker: &mut StaleChainTracker, -) -> ServiceStatus { - match rpc.status(()).await { - Ok(response) => { - let status = response.into_inner(); - let rpc_details = RpcStatusDetails::from_rpc_status(status, url); - - // Check for stale chain tip using the store's chain tip - if let Some(store_status) = &rpc_details.store_status { - if let Some(stale_duration) = - stale_tracker.update(store_status.chain_tip, current_time) - { - debug!( - target: COMPONENT, - chain_tip = store_status.chain_tip, - stale_duration_secs = stale_duration, - "Chain tip is stale" - ); - return ServiceStatus::unhealthy( - "RPC", - format!( - "Chain tip {} has not changed for {} seconds", - store_status.chain_tip, stale_duration - ), - ServiceDetails::RpcStatus(rpc_details), - ); - } - } - - ServiceStatus::healthy("RPC", ServiceDetails::RpcStatus(rpc_details)) - }, - Err(e) => { - debug!(target: COMPONENT, error = %e, "RPC status check failed"); - ServiceStatus::error("RPC", e) - }, +impl Service for RpcService { + fn name(&self) -> &'static str { + "RPC" } -} - -// REMOTE PROVER STATUS CHECKER -// ================================================================================================ -/// Runs a task that continuously checks remote prover status and updates a watch channel. -/// -/// This function spawns a task that periodically checks a remote prover service status -/// and sends updates through a watch channel. -/// -/// # Arguments -/// -/// * `prover_url` - The URL of the remote prover service. -/// * `name` - The name of the remote prover. -/// * `status_sender` - The sender for the watch channel. -/// * `status_check_interval` - The interval at which to check the status of the services. -/// -/// # Returns -/// -/// `Ok(())` if the monitoring task runs and completes successfully, or an error if there are -/// connection issues or failures while checking the remote prover status. -pub async fn run_remote_prover_status_task( - prover_url: Url, - name: String, - status_sender: watch::Sender, - status_check_interval: Duration, - request_timeout: Duration, -) { - let url_str = prover_url.to_string(); - let mut remote_prover = ClientBuilder::new(prover_url) - .with_tls() - .expect("TLS is enabled") - .with_timeout(request_timeout) - .without_metadata_version() - .without_metadata_genesis() - .without_otel_context_injection() - .connect_lazy::(); - - let mut interval = tokio::time::interval(status_check_interval); - interval.set_missed_tick_behavior(MissedTickBehavior::Skip); - - loop { - interval.tick().await; - - let status = - check_remote_prover_status(&mut remote_prover, name.clone(), url_str.clone()).await; - - // Send the status update; exit if no receivers (shutdown signal) - if status_sender.send(status).is_err() { - info!("No receivers for remote prover status updates, shutting down"); - return; - } + fn interval(&self) -> Duration { + self.interval } -} - -/// Checks the status of the remote prover service. -/// -/// This function checks the status of the remote prover service. -/// -/// # Arguments -/// -/// * `remote_prover` - The remote prover client. -/// * `name` - The name of the remote prover. -/// * `url` - The URL of the remote prover. -/// -/// # Returns -/// -/// A `ServiceStatus` containing the status of the remote prover service. -#[instrument( - parent = None, - target = COMPONENT, - name = "network_monitor.status.check_remote_prover_status", - skip_all, - level = "info", - ret(level = "debug") -)] -pub(crate) async fn check_remote_prover_status( - remote_prover: &mut miden_node_proto::clients::RemoteProverProxyStatusClient, - display_name: String, - url: String, -) -> ServiceStatus { - match remote_prover.status(()).await { - Ok(response) => { - let status = response.into_inner(); - // Use the new method to convert gRPC status to domain type - let remote_prover_details = RemoteProverStatusDetails::from_proxy_status(status, url); + fn initial_status(&self) -> ServiceStatus { + ServiceStatus::unknown( + self.name(), + ServiceDetails::RpcStatus(RpcStatusDetails { + url: self.url.clone(), + version: String::new(), + genesis_commitment: None, + store_status: None, + block_producer_status: None, + }), + ) + } - // Determine overall health based on worker statuses. - // All workers must be healthy for the prover to be considered healthy. - let no_workers = remote_prover_details.workers.is_empty(); - let all_healthy = - remote_prover_details.workers.iter().all(|w| w.status == Status::Healthy); - let unhealthy_worker_names: Vec<_> = remote_prover_details - .workers - .iter() - .filter(|w| w.status != Status::Healthy) - .map(|w| w.name.clone()) - .collect(); - let details = ServiceDetails::ProverStatusCheck(remote_prover_details); + #[instrument( + parent = None, + target = COMPONENT, + name = "network_monitor.status.check_rpc", + skip_all, + level = "info", + ret(level = "debug") + )] + async fn check(&mut self) -> ServiceStatus { + match self.rpc.status(()).await { + Ok(response) => { + let rpc_details = + RpcStatusDetails::from_rpc_status(response.into_inner(), self.url.clone()); + + if let Some(store_status) = &rpc_details.store_status { + if let Some(stale_duration) = self + .stale_tracker + .update(store_status.chain_tip, current_unix_timestamp_secs()) + { + debug!( + target: COMPONENT, + chain_tip = store_status.chain_tip, + stale_duration_secs = stale_duration, + "Chain tip is stale" + ); + return ServiceStatus::unhealthy( + self.name(), + format!( + "Chain tip {} has not changed for {} seconds", + store_status.chain_tip, stale_duration + ), + ServiceDetails::RpcStatus(rpc_details), + ); + } + } - if no_workers { - ServiceStatus::unknown(display_name, details) - } else if all_healthy { - ServiceStatus::healthy(display_name, details) - } else { - ServiceStatus::unhealthy( - display_name, - format!("unhealthy workers: {}", unhealthy_worker_names.join(", ")), - details, - ) - } - }, - Err(e) => { - debug!(target: COMPONENT, prover_name = %display_name, error = %e, "Remote prover status check failed"); - ServiceStatus::error(display_name, e) - }, + ServiceStatus::healthy(self.name(), ServiceDetails::RpcStatus(rpc_details)) + }, + Err(e) => { + debug!(target: COMPONENT, error = %e, "RPC status check failed"); + ServiceStatus::error(self.name(), e) + }, + } } } diff --git a/bin/network-monitor/src/validator.rs b/bin/network-monitor/src/validator.rs index 2376ebd47..21f77b1ba 100644 --- a/bin/network-monitor/src/validator.rs +++ b/bin/network-monitor/src/validator.rs @@ -3,81 +3,60 @@ use std::time::Duration; -use miden_node_proto::clients::{Builder as ClientBuilder, ValidatorClient}; -use tokio::sync::watch; -use tokio::time::MissedTickBehavior; -use tracing::{info, instrument}; +use miden_node_proto::clients::ValidatorClient; +use tracing::instrument; use url::Url; use crate::COMPONENT; +use crate::service::{Service, build_tls_client}; use crate::status::{ServiceDetails, ServiceStatus, ValidatorStatusDetails}; -/// Runs a task that continuously checks validator status and updates a watch channel. -pub async fn run_validator_status_task( +pub struct ValidatorService { url: Url, - name: String, - status_sender: watch::Sender, - status_check_interval: Duration, - request_timeout: Duration, -) { - let mut validator = ClientBuilder::new(url.clone()) - .with_tls() - .expect("TLS is enabled") - .with_timeout(request_timeout) - .without_metadata_version() - .without_metadata_genesis() - .without_otel_context_injection() - .connect_lazy::(); - - let mut interval = tokio::time::interval(status_check_interval); - interval.set_missed_tick_behavior(MissedTickBehavior::Skip); - - loop { - interval.tick().await; - - let status = check_validator_status(&mut validator, &url, name.clone()).await; + client: ValidatorClient, + interval: Duration, +} - if status_sender.send(status).is_err() { - info!("No receivers for validator status updates, shutting down"); - return; - } +impl ValidatorService { + pub fn new(url: Url, interval: Duration, timeout: Duration) -> Self { + let client = build_tls_client::(url.clone(), timeout); + Self { url, client, interval } } } -/// Checks the status of the validator service via its gRPC Status endpoint. -#[instrument( - target = COMPONENT, - name = "check-status.validator", - skip_all, - ret(level = "info") -)] -pub(crate) async fn check_validator_status( - validator: &mut ValidatorClient, - url: &Url, - name: String, -) -> ServiceStatus { - match validator.status(()).await { - Ok(response) => { - let status = response.into_inner(); +impl Service for ValidatorService { + fn name(&self) -> &'static str { + "Validator" + } - ServiceStatus::healthy( - name, - ServiceDetails::ValidatorStatus(ValidatorStatusDetails { - url: url.to_string(), - version: status.version, - chain_tip: status.chain_tip, - validated_transactions_count: status.validated_transactions_count, - signed_blocks_count: status.signed_blocks_count, - }), - ) - }, - Err(e) => ServiceStatus::error(name, e), + fn interval(&self) -> Duration { + self.interval + } + + fn initial_status(&self) -> ServiceStatus { + ServiceStatus::unknown( + self.name(), + ServiceDetails::ValidatorStatus(ValidatorStatusDetails::default()), + ) } -} -pub(crate) fn initial_validator_status() -> ServiceStatus { - ServiceStatus::unknown( - "Validator", - ServiceDetails::ValidatorStatus(ValidatorStatusDetails::default()), - ) + #[instrument(target = COMPONENT, name = "check-status.validator", skip_all, ret(level = "info"))] + async fn check(&mut self) -> ServiceStatus { + match self.client.status(()).await { + Ok(response) => { + let status = response.into_inner(); + ServiceStatus::healthy( + self.name(), + ServiceDetails::ValidatorStatus(ValidatorStatusDetails { + url: self.url.to_string(), + version: status.version, + chain_tip: status.chain_tip, + validated_transactions_count: status.validated_transactions_count, + signed_blocks_count: status.signed_blocks_count, + }), + ) + }, + Err(e) => ServiceStatus::error(self.name(), e), + } + } }