diff --git a/crates/db/src/lib.rs b/crates/db/src/lib.rs index 7000f131d..0afc56a1a 100644 --- a/crates/db/src/lib.rs +++ b/crates/db/src/lib.rs @@ -46,10 +46,13 @@ impl Db { .await .map_err(|e| DatabaseError::ConnectionPoolObtainError(Box::new(e)))?; - conn.interact(|conn| <_ as diesel::Connection>::transaction::(conn, query)) - .in_current_span() - .await - .map_err(|err| E::from(DatabaseError::interact(&msg.to_string(), &err)))? + let span = tracing::Span::current(); + conn.interact(move |conn| { + let _guard = span.enter(); + <_ as diesel::Connection>::transaction::(conn, query) + }) + .await + .map_err(|err| E::from(DatabaseError::interact(&msg.to_string(), &err)))? } /// Run the query _without_ a transaction diff --git a/crates/store/src/db/mod.rs b/crates/store/src/db/mod.rs index 86581a9d9..5f115e53d 100644 --- a/crates/store/src/db/mod.rs +++ b/crates/store/src/db/mod.rs @@ -561,12 +561,17 @@ impl Db { // XXX FIXME TODO free floating mutex MUST NOT exist // it doesn't bind it properly to the data locked! - if allow_acquire.send(()).is_err() { - tracing::warn!(target: COMPONENT, "failed to send notification for successful block application, potential deadlock"); + { + let _span = tracing::info_span!(target: COMPONENT, "acquire_write_lock").entered(); + if allow_acquire.send(()).is_err() { + tracing::warn!(target: COMPONENT, "failed to send notification for successful block application, potential deadlock"); + } } models::queries::prune_history(conn, signed_block.header().block_num())?; + let _span = + tracing::info_span!(target: COMPONENT, "acquire_done_lock").entered(); acquire_done.blocking_recv()?; Ok(()) diff --git a/crates/store/src/state/apply_block.rs b/crates/store/src/state/apply_block.rs index d8aab21e7..8a8884a9c 100644 --- a/crates/store/src/state/apply_block.rs +++ b/crates/store/src/state/apply_block.rs @@ -2,9 +2,12 @@ use std::sync::Arc; use miden_node_proto::BlockProofRequest; use miden_node_utils::ErrorReport; +use miden_protocol::Word; use miden_protocol::account::delta::AccountUpdateDetails; -use miden_protocol::block::SignedBlock; -use miden_protocol::note::NoteDetails; +use miden_protocol::block::account_tree::AccountMutationSet; +use miden_protocol::block::nullifier_tree::NullifierMutationSet; +use miden_protocol::block::{BlockBody, BlockHeader, SignedBlock}; +use miden_protocol::note::{NoteDetails, Nullifier}; use miden_protocol::transaction::OutputNote; use miden_protocol::utils::serde::Serializable; use tokio::sync::oneshot; @@ -44,7 +47,6 @@ impl State { /// /// Returns an error if `proving_inputs` is `None` and the block is not the genesis block. // TODO: This span is logged in a root span, we should connect it to the parent span. - #[expect(clippy::too_many_lines)] #[instrument(target = COMPONENT, skip_all, err)] pub async fn apply_block( &self, @@ -56,36 +58,10 @@ impl State { let header = signed_block.header(); let body = signed_block.body(); - // Validate that header and body match. - let tx_commitment = body.transactions().commitment(); - if header.tx_commitment() != tx_commitment { - return Err(InvalidBlockError::InvalidBlockTxCommitment { - expected: tx_commitment, - actual: header.tx_commitment(), - } - .into()); - } - let block_num = header.block_num(); let block_commitment = header.commitment(); - // Validate that the applied block is the next block in sequence. - let prev_block = self - .db - .select_block_header_by_block_num(None) - .await? - .ok_or(ApplyBlockError::DbBlockHeaderEmpty)?; - let expected_block_num = prev_block.block_num().child(); - if block_num != expected_block_num { - return Err(InvalidBlockError::NewBlockInvalidBlockNum { - expected: expected_block_num, - submitted: block_num, - } - .into()); - } - if header.prev_block_commitment() != prev_block.commitment() { - return Err(InvalidBlockError::NewBlockInvalidPrevCommitment.into()); - } + self.validate_block_header(header, body).await?; // Save the block to the block store. In a case of a rolled-back DB transaction, the // in-memory state will be unchanged, but the block might still be written into the @@ -98,117 +74,14 @@ impl State { async move { store.save_block(block_num, &signed_block_bytes).await }.in_current_span(), ); - // Scope to read in-memory data, compute mutations required for updating account - // and nullifier trees, and validate the request. let ( nullifier_tree_old_root, nullifier_tree_update, account_tree_old_root, account_tree_update, - ) = { - let inner = self.inner.read().await; - - let _span = info_span!(target: COMPONENT, "update_in_memory_structs").entered(); - - // nullifiers can be produced only once - let duplicate_nullifiers: Vec<_> = body - .created_nullifiers() - .iter() - .filter(|&nullifier| inner.nullifier_tree.get_block_num(nullifier).is_some()) - .copied() - .collect(); - if !duplicate_nullifiers.is_empty() { - return Err(InvalidBlockError::DuplicatedNullifiers(duplicate_nullifiers).into()); - } - - // compute updates for the in-memory data structures - - // new_block.chain_root must be equal to the chain MMR root prior to the update - let peaks = inner.blockchain.peaks(); - if peaks.hash_peaks() != header.chain_commitment() { - return Err(InvalidBlockError::NewBlockInvalidChainCommitment.into()); - } - - // compute update for nullifier tree - let nullifier_tree_update = inner - .nullifier_tree - .compute_mutations( - body.created_nullifiers().iter().map(|nullifier| (*nullifier, block_num)), - ) - .map_err(InvalidBlockError::NewBlockNullifierAlreadySpent)?; - - if nullifier_tree_update.as_mutation_set().root() != header.nullifier_root() { - // We do our best here to notify the serve routine, if it doesn't care (dropped the - // receiver) we can't do much. - let _ = self.termination_ask.try_send(ApplyBlockError::InvalidBlockError( - InvalidBlockError::NewBlockInvalidNullifierRoot, - )); - return Err(InvalidBlockError::NewBlockInvalidNullifierRoot.into()); - } - - // compute update for account tree - let account_tree_update = inner - .account_tree - .compute_mutations( - body.updated_accounts() - .iter() - .map(|update| (update.account_id(), update.final_state_commitment())), - ) - .map_err(|e| match e { - HistoricalError::AccountTreeError(err) => { - InvalidBlockError::NewBlockDuplicateAccountIdPrefix(err) - }, - HistoricalError::MerkleError(_) => { - panic!("Unexpected MerkleError during account tree mutation computation") - }, - })?; - - if account_tree_update.as_mutation_set().root() != header.account_root() { - let _ = self.termination_ask.try_send(ApplyBlockError::InvalidBlockError( - InvalidBlockError::NewBlockInvalidAccountRoot, - )); - return Err(InvalidBlockError::NewBlockInvalidAccountRoot.into()); - } - - ( - inner.nullifier_tree.root(), - nullifier_tree_update, - inner.account_tree.root_latest(), - account_tree_update, - ) - }; + ) = self.compute_tree_mutations(header, body).await?; - // Build note tree. - let note_tree = body.compute_block_note_tree(); - if note_tree.root() != header.note_root() { - return Err(InvalidBlockError::NewBlockInvalidNoteRoot.into()); - } - - let notes = body - .output_notes() - .map(|(note_index, note)| { - let (details, nullifier) = match note { - OutputNote::Public(note) => { - (Some(NoteDetails::from(note.as_note())), Some(note.as_note().nullifier())) - }, - OutputNote::Private(_) => (None, None), - }; - - let inclusion_path = note_tree.open(note_index); - - let note_record = NoteRecord { - block_num, - note_index, - note_id: note.id().as_word(), - note_commitment: note.to_commitment(), - metadata: note.metadata().clone(), - details, - inclusion_path, - }; - - Ok((note_record, nullifier)) - }) - .collect::, InvalidBlockError>>()?; + let notes = Self::build_note_records(header, body)?; // Signals the transaction is ready to be committed, and the write lock can be acquired. let (allow_acquire, acquired_allowed) = oneshot::channel::<()>(); @@ -238,8 +111,12 @@ impl State { .in_current_span(), ); - // Wait for the message from the DB update task, that we ready to commit the DB transaction. - acquired_allowed.await.map_err(ApplyBlockError::ClosedChannel)?; + // Wait for the message from the DB update task, that we ready to commit the DB + // transaction. + acquired_allowed + .instrument(info_span!(target: COMPONENT, "await_db_readiness")) + .await + .map_err(ApplyBlockError::ClosedChannel)?; // Awaiting the block saving task to complete without errors. block_save_task.await??; @@ -249,7 +126,11 @@ impl State { // We need to hold the write lock here to prevent inconsistency between the in-memory // state and the DB state. Thus, we need to wait for the DB update task to complete // successfully. - let mut inner = self.inner.write().await; + let mut inner = self + .inner + .write() + .instrument(info_span!(target: COMPONENT, "acquire_inner_write_lock")) + .await; // We need to check that neither the nullifier tree nor the account tree have changed // while we were waiting for the DB preparation task to complete. If either of them @@ -292,10 +173,176 @@ impl State { .in_current_span() .await?; - self.forest.write().await.apply_block_updates(block_num, account_deltas)?; + self.forest + .write() + .instrument(info_span!(target: COMPONENT, "acquire_forest_write_lock")) + .await + .apply_block_updates(block_num, account_deltas)?; info!(%block_commitment, block_num = block_num.as_u32(), COMPONENT, "apply_block successful"); Ok(()) } + + /// Validates that the block header is consistent with the block body and the current state. + #[instrument(target = COMPONENT, skip_all, err)] + async fn validate_block_header( + &self, + header: &BlockHeader, + body: &BlockBody, + ) -> Result<(), ApplyBlockError> { + // Validate that header and body match. + let tx_commitment = body.transactions().commitment(); + if header.tx_commitment() != tx_commitment { + return Err(InvalidBlockError::InvalidBlockTxCommitment { + expected: tx_commitment, + actual: header.tx_commitment(), + } + .into()); + } + + let block_num = header.block_num(); + + // Validate that the applied block is the next block in sequence. + let prev_block = self + .db + .select_block_header_by_block_num(None) + .await? + .ok_or(ApplyBlockError::DbBlockHeaderEmpty)?; + let expected_block_num = prev_block.block_num().child(); + if block_num != expected_block_num { + return Err(InvalidBlockError::NewBlockInvalidBlockNum { + expected: expected_block_num, + submitted: block_num, + } + .into()); + } + if header.prev_block_commitment() != prev_block.commitment() { + return Err(InvalidBlockError::NewBlockInvalidPrevCommitment.into()); + } + + Ok(()) + } + + /// Computes nullifier and account tree mutations, validating roots against the block header. + #[instrument(target = COMPONENT, skip_all, err)] + async fn compute_tree_mutations( + &self, + header: &BlockHeader, + body: &BlockBody, + ) -> Result<(Word, NullifierMutationSet, Word, AccountMutationSet), ApplyBlockError> { + let inner = self + .inner + .read() + .instrument(info_span!(target: COMPONENT, "acquire_inner_read_lock")) + .await; + + let block_num = header.block_num(); + + // nullifiers can be produced only once + let duplicate_nullifiers: Vec<_> = body + .created_nullifiers() + .iter() + .filter(|&nullifier| inner.nullifier_tree.get_block_num(nullifier).is_some()) + .copied() + .collect(); + if !duplicate_nullifiers.is_empty() { + return Err(InvalidBlockError::DuplicatedNullifiers(duplicate_nullifiers).into()); + } + + // new_block.chain_root must be equal to the chain MMR root prior to the update + let peaks = inner.blockchain.peaks(); + if peaks.hash_peaks() != header.chain_commitment() { + return Err(InvalidBlockError::NewBlockInvalidChainCommitment.into()); + } + + // compute update for nullifier tree + let nullifier_tree_update = inner + .nullifier_tree + .compute_mutations( + body.created_nullifiers().iter().map(|nullifier| (*nullifier, block_num)), + ) + .map_err(InvalidBlockError::NewBlockNullifierAlreadySpent)?; + + if nullifier_tree_update.as_mutation_set().root() != header.nullifier_root() { + // We do our best here to notify the serve routine, if it doesn't care (dropped the + // receiver) we can't do much. + let _ = self.termination_ask.try_send(ApplyBlockError::InvalidBlockError( + InvalidBlockError::NewBlockInvalidNullifierRoot, + )); + return Err(InvalidBlockError::NewBlockInvalidNullifierRoot.into()); + } + + // compute update for account tree + let account_tree_update = inner + .account_tree + .compute_mutations( + body.updated_accounts() + .iter() + .map(|update| (update.account_id(), update.final_state_commitment())), + ) + .map_err(|e| match e { + HistoricalError::AccountTreeError(err) => { + InvalidBlockError::NewBlockDuplicateAccountIdPrefix(err) + }, + HistoricalError::MerkleError(_) => { + panic!("Unexpected MerkleError during account tree mutation computation") + }, + })?; + + if account_tree_update.as_mutation_set().root() != header.account_root() { + let _ = self.termination_ask.try_send(ApplyBlockError::InvalidBlockError( + InvalidBlockError::NewBlockInvalidAccountRoot, + )); + return Err(InvalidBlockError::NewBlockInvalidAccountRoot.into()); + } + + Ok(( + inner.nullifier_tree.root(), + nullifier_tree_update, + inner.account_tree.root_latest(), + account_tree_update, + )) + } + + /// Builds note records with inclusion proofs from the block body. + fn build_note_records( + header: &BlockHeader, + body: &BlockBody, + ) -> Result)>, ApplyBlockError> { + let block_num = header.block_num(); + + let note_tree = body.compute_block_note_tree(); + if note_tree.root() != header.note_root() { + return Err(InvalidBlockError::NewBlockInvalidNoteRoot.into()); + } + + let notes = body + .output_notes() + .map(|(note_index, note)| { + let (details, nullifier) = match note { + OutputNote::Public(note) => { + (Some(NoteDetails::from(note.as_note())), Some(note.as_note().nullifier())) + }, + OutputNote::Private(_) => (None, None), + }; + + let inclusion_path = note_tree.open(note_index); + + let note_record = NoteRecord { + block_num, + note_index, + note_id: note.id().as_word(), + note_commitment: note.to_commitment(), + metadata: note.metadata().clone(), + details, + inclusion_path, + }; + + Ok((note_record, nullifier)) + }) + .collect::, InvalidBlockError>>()?; + + Ok(notes) + } }