From 84867be940a53cc5dd9701d020f44266f0707bb9 Mon Sep 17 00:00:00 2001 From: Mirko von Leipzig <48352201+Mirko-von-Leipzig@users.noreply.github.com> Date: Wed, 22 Apr 2026 16:02:08 +0200 Subject: [PATCH 1/5] Fix: mempool desync panic --- crates/block-producer/src/server/mod.rs | 47 +++++++++++++++++++++++-- 1 file changed, 45 insertions(+), 2 deletions(-) diff --git a/crates/block-producer/src/server/mod.rs b/crates/block-producer/src/server/mod.rs index f0266c98a..9ea1dbf2c 100644 --- a/crates/block-producer/src/server/mod.rs +++ b/crates/block-producer/src/server/mod.rs @@ -336,7 +336,28 @@ impl BlockProducerRpcServer { .map(Arc::new) .map_err(MempoolSubmissionError::StateConflict)?; - self.mempool.lock().await.lock().await.add_transaction(tx).map(Into::into) + // This is a hack-around a race condition where the store has committed block N+1, + // but the block has not been committed in the mempool yet. The transaction inputs + // will be authenticated against N+1 which will cause the mempool to panic. + // + // The hack-around is to detect this situation, and wait a short while to let the mempool + // catch up. + // + // FIXME: figure out an architectural solution to this instead. + let mempool = self.mempool.lock().await; + let mut mempool = loop { + // Ensure inner mempool mutex guard is dropped before we sleep. + { + let mempool = mempool.lock().await; + if mempool.chain_tip() >= tx.authentication_height() { + break mempool; + } + } + tracing::warn!("waiting for block to commit in mempool"); + tokio::time::sleep(Duration::from_millis(100)).await; + }; + + mempool.add_transaction(tx).map(Into::into) } #[instrument( @@ -374,7 +395,29 @@ impl BlockProducerRpcServer { txs.push(tx); } - self.mempool.lock().await.lock().await.add_user_batch(&txs).map(Into::into) + // This is a hack-around a race condition where the store has committed block N+1, + // but the block has not been committed in the mempool yet. The transaction inputs + // will be authenticated against N+1 which will cause the mempool to panic. + // + // The hack-around is to detect this situation, and wait a short while to let the mempool + // catch up. + // + // FIXME: figure out an architectural solution to this instead. + let mempool = self.mempool.lock().await; + let auth_height = txs.last().expect("user batch cannot be empty").authentication_height(); + let mut mempool = loop { + // Ensure inner mempool mutex guard is dropped before we sleep. + { + let mempool = mempool.lock().await; + if mempool.chain_tip() >= auth_height { + break mempool; + } + } + tracing::warn!("waiting for block to commit in mempool"); + tokio::time::sleep(Duration::from_millis(100)).await; + }; + + mempool.add_user_batch(&txs).map(Into::into) } } From eb3489b2f1825cc95a71aa32fb675cd2620c053c Mon Sep 17 00:00:00 2001 From: Mirko von Leipzig <48352201+Mirko-von-Leipzig@users.noreply.github.com> Date: Wed, 22 Apr 2026 16:13:21 +0200 Subject: [PATCH 2/5] Revert "Fix: mempool desync panic" This reverts commit 84867be940a53cc5dd9701d020f44266f0707bb9. --- crates/block-producer/src/server/mod.rs | 47 ++----------------------- 1 file changed, 2 insertions(+), 45 deletions(-) diff --git a/crates/block-producer/src/server/mod.rs b/crates/block-producer/src/server/mod.rs index 9ea1dbf2c..f0266c98a 100644 --- a/crates/block-producer/src/server/mod.rs +++ b/crates/block-producer/src/server/mod.rs @@ -336,28 +336,7 @@ impl BlockProducerRpcServer { .map(Arc::new) .map_err(MempoolSubmissionError::StateConflict)?; - // This is a hack-around a race condition where the store has committed block N+1, - // but the block has not been committed in the mempool yet. The transaction inputs - // will be authenticated against N+1 which will cause the mempool to panic. - // - // The hack-around is to detect this situation, and wait a short while to let the mempool - // catch up. - // - // FIXME: figure out an architectural solution to this instead. - let mempool = self.mempool.lock().await; - let mut mempool = loop { - // Ensure inner mempool mutex guard is dropped before we sleep. - { - let mempool = mempool.lock().await; - if mempool.chain_tip() >= tx.authentication_height() { - break mempool; - } - } - tracing::warn!("waiting for block to commit in mempool"); - tokio::time::sleep(Duration::from_millis(100)).await; - }; - - mempool.add_transaction(tx).map(Into::into) + self.mempool.lock().await.lock().await.add_transaction(tx).map(Into::into) } #[instrument( @@ -395,29 +374,7 @@ impl BlockProducerRpcServer { txs.push(tx); } - // This is a hack-around a race condition where the store has committed block N+1, - // but the block has not been committed in the mempool yet. The transaction inputs - // will be authenticated against N+1 which will cause the mempool to panic. - // - // The hack-around is to detect this situation, and wait a short while to let the mempool - // catch up. - // - // FIXME: figure out an architectural solution to this instead. - let mempool = self.mempool.lock().await; - let auth_height = txs.last().expect("user batch cannot be empty").authentication_height(); - let mut mempool = loop { - // Ensure inner mempool mutex guard is dropped before we sleep. - { - let mempool = mempool.lock().await; - if mempool.chain_tip() >= auth_height { - break mempool; - } - } - tracing::warn!("waiting for block to commit in mempool"); - tokio::time::sleep(Duration::from_millis(100)).await; - }; - - mempool.add_user_batch(&txs).map(Into::into) + self.mempool.lock().await.lock().await.add_user_batch(&txs).map(Into::into) } } From 8f63c4c0d74c6040ff41a45eae6f38875aa3bfa1 Mon Sep 17 00:00:00 2001 From: Mirko von Leipzig <48352201+Mirko-von-Leipzig@users.noreply.github.com> Date: Wed, 22 Apr 2026 16:23:22 +0200 Subject: [PATCH 3/5] Consider pending block as chain tip in mempool wip --- crates/block-producer/src/mempool/mod.rs | 37 ++++++++++--------- crates/block-producer/src/mempool/tests.rs | 22 +++++++---- .../src/mempool/tests/add_transaction.rs | 31 +++++++++------- 3 files changed, 51 insertions(+), 39 deletions(-) diff --git a/crates/block-producer/src/mempool/mod.rs b/crates/block-producer/src/mempool/mod.rs index 734cb9204..fbfceda4e 100644 --- a/crates/block-producer/src/mempool/mod.rs +++ b/crates/block-producer/src/mempool/mod.rs @@ -203,9 +203,11 @@ impl Mempool { /// Returns the current chain tip height as seen by the mempool. /// - /// This reflects the latest committed block that the block producer is aware of. + /// This includes the block currently being built, if any. pub fn chain_tip(&self) -> BlockNumber { - self.chain_tip + self.pending_block + .as_ref() + .map_or(self.chain_tip, |pending| pending.block_number) } // TRANSACTION & BATCH LIFECYCLE @@ -246,7 +248,7 @@ impl Mempool { self.subscription.transaction_added(&tx); self.inject_telemetry(); - Ok(self.chain_tip) + Ok(self.chain_tip()) } #[instrument(target = COMPONENT, name = "mempool.add_user_batch", skip_all)] @@ -283,7 +285,7 @@ impl Mempool { } self.inject_telemetry(); - Ok(self.chain_tip) + Ok(self.chain_tip()) } /// Returns a set of transactions for the next batch. @@ -364,7 +366,7 @@ impl Mempool { self.pending_block.as_ref().unwrap().block_number ); - let block_number = self.chain_tip.child(); + let block_number = self.chain_tip().child(); let batches = self.batches.select_block(self.config.block_budget); let block = SelectedBlock { block_number, batches }; self.pending_block = Some(block.clone()); @@ -389,7 +391,6 @@ impl Mempool { /// Panics if there is no matching block in flight. #[instrument(target = COMPONENT, name = "mempool.commit_block", skip_all)] pub fn commit_block(&mut self, block_header: BlockHeader) { - assert_eq!(self.chain_tip.child(), block_header.block_num()); let block = self .pending_block .take_if(|pending| pending.block_number == block_header.block_num()) @@ -542,18 +543,18 @@ impl Mempool { /// Transactions from batches are requeued. Expired transactions and their descendants are then /// reverted as well. fn revert_expired(&mut self) -> HashSet { - let batches = self.batches.revert_expired(self.chain_tip); + let batches = self.batches.revert_expired(self.chain_tip()); for batch in batches { self.transactions.requeue_transactions(&batch); } - self.transactions.revert_expired(self.chain_tip) + self.transactions.revert_expired(self.chain_tip()) } /// Rejects authentication heights that fall outside the overlap guaranteed by the locally /// retained state. /// - /// The acceptable window is `[chain_tip - state_retention + 1, chain_tip]`; values below this - /// range are rejected as stale because the mempool no longer tracks the intermediate history. + /// If our oldest local block is at `N`, then we allow `N-1` and newer since this means we're + /// covering the full blockchain. /// /// # Panics /// @@ -565,11 +566,13 @@ impl Mempool { authentication_height: BlockNumber, ) -> Result<(), MempoolSubmissionError> { let limit = self - .chain_tip - .checked_sub(self.committed_blocks.len() as u32) - .expect("number of committed blocks cannot exceed the chain tip"); + .committed_blocks + .front() + .map_or(self.chain_tip(), |block| block.block_number) + .parent() + .unwrap_or_default(); - if authentication_height < limit { + if authentication_height < dbg!(limit) { return Err(MempoolSubmissionError::StaleInputs { input_block: authentication_height, stale_limit: limit, @@ -577,16 +580,16 @@ impl Mempool { } assert!( - authentication_height <= self.chain_tip, + authentication_height <= self.chain_tip(), "Authentication height {authentication_height} exceeded the chain tip {}", - self.chain_tip + self.chain_tip() ); Ok(()) } fn expiration_check(&self, expired_at: BlockNumber) -> Result<(), MempoolSubmissionError> { - let limit = self.chain_tip + self.config.expiration_slack; + let limit = self.chain_tip() + self.config.expiration_slack; if expired_at <= limit { return Err(MempoolSubmissionError::Expired { expired_at, limit }); } diff --git a/crates/block-producer/src/mempool/tests.rs b/crates/block-producer/src/mempool/tests.rs index 6a6e15f97..946891207 100644 --- a/crates/block-producer/src/mempool/tests.rs +++ b/crates/block-producer/src/mempool/tests.rs @@ -131,31 +131,37 @@ fn failed_batch_transactions_are_requeued() { fn block_commit_reverts_expired_txns() { let (mut uut, _) = Mempool::for_tests(); uut.config.expiration_slack = 0; + let mut reference = uut.clone(); let tx_to_commit = MockProvenTxBuilder::with_account_index(0).build(); let tx_to_commit = Arc::new(AuthenticatedTransaction::from_inner(tx_to_commit)); - // Force the tx into a pending block. + // Force the tx into the next block by batching it. uut.add_transaction(tx_to_commit.clone()).unwrap(); uut.select_batch().unwrap(); uut.commit_batch(Arc::new(ProvenBatch::mocked_from_transactions([ tx_to_commit.raw_proven_transaction() ]))); - let block = uut.select_block(); - // A reverted transaction behaves as if it never existed, the current state is the expected - // outcome, plus an extra committed block at the end. - let mut reference = uut.clone(); - // Add a new transaction which will expire when the pending block is committed. + // Add a new transaction which will expire when the block is committed. let tx_to_revert = MockProvenTxBuilder::with_account_index(1) - .expiration_block_num(block.block_number) + .expiration_block_num(uut.chain_tip().child()) .build(); let tx_to_revert = Arc::new(AuthenticatedTransaction::from_inner(tx_to_revert)); uut.add_transaction(tx_to_revert).unwrap(); - // Commit the pending block which should revert the above tx. + // Create and commit the block which should revert the above tx. + let block = uut.select_block(); let arb_header = BlockHeader::mock(block.block_number, None, None, &[], Word::empty()); uut.commit_block(arb_header.clone()); + + // A reverted transaction behaves as if it never existed. + reference.add_transaction(tx_to_commit.clone()).unwrap(); + reference.select_batch().unwrap(); + reference.commit_batch(Arc::new(ProvenBatch::mocked_from_transactions([ + tx_to_commit.raw_proven_transaction() + ]))); + reference.select_block(); reference.commit_block(arb_header); assert_eq!(uut, reference); diff --git a/crates/block-producer/src/mempool/tests/add_transaction.rs b/crates/block-producer/src/mempool/tests/add_transaction.rs index dc73f317d..27fdd7cee 100644 --- a/crates/block-producer/src/mempool/tests/add_transaction.rs +++ b/crates/block-producer/src/mempool/tests/add_transaction.rs @@ -79,12 +79,13 @@ mod tx_expiration { #[test] fn expiration_after_slack_limit_is_accepted() { let mut uut = setup(); - let limit = uut.chain_tip + uut.config.expiration_slack; + let limit = uut.chain_tip() + uut.config.expiration_slack; let tx = MockProvenTxBuilder::with_account_index(0) .expiration_block_num(limit.child()) .build(); - let tx = AuthenticatedTransaction::from_inner(tx).with_authentication_height(uut.chain_tip); + let tx = + AuthenticatedTransaction::from_inner(tx).with_authentication_height(uut.chain_tip()); let tx = Arc::new(tx); uut.add_transaction(tx).unwrap(); } @@ -92,14 +93,14 @@ mod tx_expiration { #[test] fn expiration_within_slack_limit_is_rejected() { let mut uut = setup(); - let limit = uut.chain_tip + uut.config.expiration_slack; + let limit = uut.chain_tip() + uut.config.expiration_slack; - for i in uut.chain_tip.child().as_u32()..=limit.as_u32() { + for i in uut.chain_tip().child().as_u32()..=limit.as_u32() { let tx = MockProvenTxBuilder::with_account_index(0) .expiration_block_num(i.into()) .build(); - let tx = - AuthenticatedTransaction::from_inner(tx).with_authentication_height(uut.chain_tip); + let tx = AuthenticatedTransaction::from_inner(tx) + .with_authentication_height(uut.chain_tip()); let tx = Arc::new(tx); let result = uut.add_transaction(tx); @@ -115,9 +116,10 @@ mod tx_expiration { fn already_expired_is_rejected() { let mut uut = setup(); let tx = MockProvenTxBuilder::with_account_index(0) - .expiration_block_num(uut.chain_tip) + .expiration_block_num(uut.chain_tip()) .build(); - let tx = AuthenticatedTransaction::from_inner(tx).with_authentication_height(uut.chain_tip); + let tx = + AuthenticatedTransaction::from_inner(tx).with_authentication_height(uut.chain_tip()); let tx = Arc::new(tx); let result = uut.add_transaction(tx); @@ -154,11 +156,12 @@ mod authentication_height { fn stale_inputs_are_rejected() { let mut uut = setup(); - let oldest_local = uut.chain_tip.as_u32() - uut.config.state_retention.get() as u32 + 1; + let oldest_mempool = uut.committed_blocks.front().map(|block| block.block_number).unwrap(); + dbg!(oldest_mempool); let tx = MockProvenTxBuilder::with_account_index(0).build(); let tx = AuthenticatedTransaction::from_inner(tx) - .with_authentication_height((oldest_local - 2).into()); + .with_authentication_height((oldest_mempool.as_u32() - 2).into()); let tx = Arc::new(tx); uut.add_transaction(tx).unwrap_err(); } @@ -173,7 +176,7 @@ mod authentication_height { let tx = MockProvenTxBuilder::with_account_index(0).build(); let tx = AuthenticatedTransaction::from_inner(tx) - .with_authentication_height(uut.chain_tip.child()); + .with_authentication_height(uut.chain_tip().child()); let tx = Arc::new(tx); let _ = uut.add_transaction(tx); } @@ -186,9 +189,9 @@ mod authentication_height { fn inputs_from_within_overlap_are_accepted() { let mut uut = setup(); - let oldest_local = uut.chain_tip.as_u32() - uut.config.state_retention.get() as u32 + 1; + let oldest_local = uut.chain_tip().as_u32() - uut.config.state_retention.get() as u32 + 1; - for i in oldest_local - 1..=uut.chain_tip.as_u32() { + for i in oldest_local - 1..=uut.chain_tip().as_u32() { let tx = MockProvenTxBuilder::with_account_index(i).build(); let tx = AuthenticatedTransaction::from_inner(tx).with_authentication_height(i.into()); let tx = Arc::new(tx); @@ -199,7 +202,7 @@ mod authentication_height { result, Ok(..), "Failed run with authentication height {i}, chain tip {} and oldest local {oldest_local}", - uut.chain_tip + uut.chain_tip() ); } } From 0109f2230abd1820e9f8df54c7033b8e54609a27 Mon Sep 17 00:00:00 2001 From: Mirko von Leipzig <48352201+Mirko-von-Leipzig@users.noreply.github.com> Date: Thu, 23 Apr 2026 10:18:21 +0200 Subject: [PATCH 4/5] Review comments --- CHANGELOG.md | 2 +- crates/block-producer/src/mempool/mod.rs | 11 ++++++----- .../src/mempool/tests/add_transaction.rs | 1 - 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 98336a33f..af8f77f6c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,7 +3,7 @@ ## v0.14.10 (TBD) - Added `accept`, `origin`, `user-agent`, `forwarded`, `x-forwarded-for` and `x-real-ip` headers to telemetry for gRPC requests ([#1982](https://github.com/0xMiden/node/pull/1982). -- Fixed occasional mempool panic during transaction submission, causing the lock to be held for longer than expected ([#1984](https://github.com/0xMiden/node/pull/1984). +- Fixed occasional mempool panic during transaction submission, causing the lock to be held for longer than expected ([#1984](https://github.com/0xMiden/node/pull/1984)). ## v0.14.9 (2026-04-21) diff --git a/crates/block-producer/src/mempool/mod.rs b/crates/block-producer/src/mempool/mod.rs index fbfceda4e..8bb3e62a7 100644 --- a/crates/block-producer/src/mempool/mod.rs +++ b/crates/block-producer/src/mempool/mod.rs @@ -174,7 +174,7 @@ pub struct Mempool { /// committed it is appended here, and the oldest block's state is pruned. committed_blocks: VecDeque, - chain_tip: BlockNumber, + committed_chain_tip: BlockNumber, config: MempoolConfig, subscription: subscription::SubscriptionProvider, @@ -192,7 +192,7 @@ impl Mempool { fn new(chain_tip: BlockNumber, config: MempoolConfig) -> Mempool { Self { config, - chain_tip, + committed_chain_tip: chain_tip, subscription: SubscriptionProvider::new(chain_tip), transactions: graph::TransactionGraph::default(), batches: graph::BatchGraph::default(), @@ -207,7 +207,7 @@ impl Mempool { pub fn chain_tip(&self) -> BlockNumber { self.pending_block .as_ref() - .map_or(self.chain_tip, |pending| pending.block_number) + .map_or(self.committed_chain_tip, |pending| pending.block_number) } // TRANSACTION & BATCH LIFECYCLE @@ -391,6 +391,7 @@ impl Mempool { /// Panics if there is no matching block in flight. #[instrument(target = COMPONENT, name = "mempool.commit_block", skip_all)] pub fn commit_block(&mut self, block_header: BlockHeader) { + assert_eq!(self.committed_chain_tip.child(), block_header.block_num()); let block = self .pending_block .take_if(|pending| pending.block_number == block_header.block_num()) @@ -403,7 +404,7 @@ impl Mempool { .map(miden_protocol::transaction::TransactionHeader::id) .collect(); - self.chain_tip = self.chain_tip.child(); + self.committed_chain_tip = self.committed_chain_tip.child(); self.subscription.block_committed(block_header, tx_ids); self.committed_blocks.push_back(block); @@ -572,7 +573,7 @@ impl Mempool { .parent() .unwrap_or_default(); - if authentication_height < dbg!(limit) { + if authentication_height < limit { return Err(MempoolSubmissionError::StaleInputs { input_block: authentication_height, stale_limit: limit, diff --git a/crates/block-producer/src/mempool/tests/add_transaction.rs b/crates/block-producer/src/mempool/tests/add_transaction.rs index 27fdd7cee..6abe59657 100644 --- a/crates/block-producer/src/mempool/tests/add_transaction.rs +++ b/crates/block-producer/src/mempool/tests/add_transaction.rs @@ -157,7 +157,6 @@ mod authentication_height { let mut uut = setup(); let oldest_mempool = uut.committed_blocks.front().map(|block| block.block_number).unwrap(); - dbg!(oldest_mempool); let tx = MockProvenTxBuilder::with_account_index(0).build(); let tx = AuthenticatedTransaction::from_inner(tx) From 5dceb94c0abe843f426c490ff0c25bb9f2571a42 Mon Sep 17 00:00:00 2001 From: Mirko von Leipzig <48352201+Mirko-von-Leipzig@users.noreply.github.com> Date: Thu, 23 Apr 2026 10:19:38 +0200 Subject: [PATCH 5/5] Ensure add_transaction and add_batch return the committed tip --- crates/block-producer/src/mempool/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/block-producer/src/mempool/mod.rs b/crates/block-producer/src/mempool/mod.rs index 8bb3e62a7..9395a4111 100644 --- a/crates/block-producer/src/mempool/mod.rs +++ b/crates/block-producer/src/mempool/mod.rs @@ -248,7 +248,7 @@ impl Mempool { self.subscription.transaction_added(&tx); self.inject_telemetry(); - Ok(self.chain_tip()) + Ok(self.committed_chain_tip) } #[instrument(target = COMPONENT, name = "mempool.add_user_batch", skip_all)] @@ -285,7 +285,7 @@ impl Mempool { } self.inject_telemetry(); - Ok(self.chain_tip()) + Ok(self.committed_chain_tip) } /// Returns a set of transactions for the next batch.