From 582ac52f057560cce1d7a72b31c18e77d02f0cb6 Mon Sep 17 00:00:00 2001 From: Ekleog-Polygon Date: Mon, 11 May 2026 11:51:47 +0200 Subject: [PATCH] revert: restore per-epoch certificate rate limiting (#1514) This reverts commit 25692a8e7ec5c863787b537ecdeec95811751df5 ("feat: remove per epoch rate limiting (#1387)"). Re-enables MAX_CERTIFICATE_PER_EPOCH enforcement and epoch-capacity gating in NetworkTask so that only one certificate per network is processed per epoch. --- .../src/network_task.rs | 50 +- .../src/network_task/tests.rs | 1242 +++++++++-------- .../src/network_task/tests/status.rs | 8 + .../src/stores/per_epoch/mod.rs | 29 +- .../src/stores/per_epoch/tests.rs | 219 +-- 5 files changed, 728 insertions(+), 820 deletions(-) diff --git a/crates/agglayer-certificate-orchestrator/src/network_task.rs b/crates/agglayer-certificate-orchestrator/src/network_task.rs index 738a86742..3bfadee04 100644 --- a/crates/agglayer-certificate-orchestrator/src/network_task.rs +++ b/crates/agglayer-certificate-orchestrator/src/network_task.rs @@ -13,7 +13,7 @@ use agglayer_types::{ use pessimistic_proof::{ core::commitment::PessimisticRootCommitmentVersion, local_state::StateCommitment, }; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::{broadcast, mpsc, oneshot}; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, instrument, warn}; @@ -137,11 +137,14 @@ pub(crate) struct NetworkTask, /// The local network state of the network task. local_state: Box, + /// The clock reference to subscribe to the epoch events and check for /// current epoch. clock_ref: ClockRef, /// The stream of new certificates to certify. certificate_stream: mpsc::Receiver, + /// Flag to indicate if the network is at capacity for the current epoch. + at_capacity_for_epoch: bool, /// latest certificate settled latest_settled: Option, } @@ -190,6 +193,7 @@ where local_state, clock_ref, certificate_stream, + at_capacity_for_epoch: false, latest_settled, settlement_client, }) @@ -208,6 +212,8 @@ where ) -> Result { info!("Starting the network task for network {}", self.network_id); + let mut stream_epoch = self.clock_ref.subscribe()?; + let current_epoch = self.clock_ref.current_epoch(); // Start from the latest settled certificate to define the next expected height @@ -221,6 +227,7 @@ where debug!("Current network height is {}", current_height); if epoch == current_epoch { debug!("Already settled for the epoch {current_epoch}"); + self.at_capacity_for_epoch = true; } current_height.next() @@ -239,7 +246,7 @@ where return Ok(self.network_id); } - result = self.make_progress(&mut next_expected_height, &mut first_run, &cancellation_token) => { + result = self.make_progress(&mut stream_epoch, &mut next_expected_height, &mut first_run, &cancellation_token) => { if let Err(error)= result { error!("Error during the certification process: {}", error); @@ -253,9 +260,10 @@ where } } - #[instrument(skip(self, cancellation_token), fields(certificate_id))] + #[instrument(skip(self, stream_epoch, cancellation_token), fields(certificate_id))] async fn make_progress( &mut self, + stream_epoch: &mut tokio::sync::broadcast::Receiver, next_expected_height: &mut Height, first_run: &mut bool, cancellation_token: &CancellationToken, @@ -264,7 +272,39 @@ where *first_run = false; } else { tokio::select! { - Some(NewCertificate { certificate_id, height, .. }) = self.certificate_stream.recv() => { + event = stream_epoch.recv() => { + let network_id = self.network_id; + match event { + Ok(agglayer_clock::Event::EpochEnded(epoch)) => { + info!("Received an epoch event: {}", epoch); + + let current_epoch = self.clock_ref.current_epoch(); + if epoch != EpochNumber::ZERO && epoch.next() < current_epoch { + warn!("Received an epoch event for epoch {epoch} which is outdated, current epoch is {current_epoch}"); + + return Ok(()); + } + match self.latest_settled { + Some(SettledCertificate(_, _, epoch, _)) if epoch == current_epoch => { + warn!("Network {network_id} is at capacity for the epoch {current_epoch}"); + return Ok(()); + }, + _ => { + self.at_capacity_for_epoch = false; + } + } + } + Err(broadcast::error::RecvError::Lagged(num_skipped)) => { + warn!("Network {network_id} skipped {num_skipped} epoch ticks"); + return Ok(()); + } + Err(broadcast::error::RecvError::Closed) => { + error!("Epoch channel closed for network {network_id}"); + return Err(Error::InternalError("epoch channel closed".into())); + } + } + } + Some(NewCertificate { certificate_id, height, .. }) = self.certificate_stream.recv(), if !self.at_capacity_for_epoch => { info!( hash = certificate_id.to_string(), "Received a certificate event for {certificate_id} at height {height}" @@ -505,6 +545,7 @@ where continue; } Some(NetworkTaskMessage::CertificateSettled { settled_certificate, height, .. }) => { + self.at_capacity_for_epoch = true; let epoch_number = settled_certificate.2; let certificate_index = settled_certificate.3; self.latest_settled = Some(settled_certificate); @@ -547,6 +588,7 @@ where } Some(NetworkTaskMessage::CertificateErrored { .. }) => { // The certificate task already logged everything that should be logged. + self.at_capacity_for_epoch = false; break; } Some(NetworkTaskMessage::CheckSettlementTx { settlement_tx_hash, tx_mined_notifier, .. }) => { diff --git a/crates/agglayer-certificate-orchestrator/src/network_task/tests.rs b/crates/agglayer-certificate-orchestrator/src/network_task/tests.rs index 0e0b67cf6..59dc7fa4f 100644 --- a/crates/agglayer-certificate-orchestrator/src/network_task/tests.rs +++ b/crates/agglayer-certificate-orchestrator/src/network_task/tests.rs @@ -12,7 +12,7 @@ use agglayer_types::{ aggchain_data::CertificateAggchainDataCtx, Certificate, CertificateStatus, L1WitnessCtx, Metadata, PessimisticRootInput, }; -use mockall::predicate::{always, eq}; +use mockall::predicate::{always, eq, in_iter}; use pessimistic_proof::core::commitment::PessimisticRootCommitmentVersion; use rstest::rstest; @@ -28,129 +28,208 @@ mod status; const SETTLEMENT_TX_HASH_1: SettlementTxHash = SettlementTxHash::new(Digest([1; 32])); const SETTLEMENT_TX_HASH_2: SettlementTxHash = SettlementTxHash::new(Digest([2; 32])); -// Helper functions to reduce test duplication +#[rstest] +#[tokio::test] +#[timeout(Duration::from_secs(1))] +async fn start_from_zero() { + let mut pending = MockPendingStore::new(); + let mut state = MockStateStore::new(); + let mut certifier = MockCertifier::new(); + let mut settlement_client = MockSettlementClient::new(); + let clock_ref = clock(); + let network_id = 1.into(); + let (sender, certificate_stream) = mpsc::channel(1); -fn create_test_certificate(forest: &mut Forest, height: Height) -> Certificate { - if height == Height::ZERO { - forest.apply_events( - &[(USDC, 10.try_into().unwrap())], - &[(USDC, 1.try_into().unwrap())], - ) - } else { - let mut cert = forest.apply_events(&[], &[(USDC, 1.try_into().unwrap())]); - cert.height = height; - cert - } -} + let certificate = Certificate::new_for_test(network_id, Height::ZERO); + let certificate_id = certificate.hash(); -fn setup_certifier_mock( - certifier: &mut MockCertifier, - pending_store: Arc, - network_id: NetworkId, - times: usize, - specific_height: Option, -) { - let mut expectation = certifier.expect_certify(); - - if times == 1 { - expectation = expectation.once(); - } else { - expectation = expectation.times(times); - } + pending + .expect_get_certificate() + .once() + .with(eq(network_id), eq(Height::ZERO)) + .returning(|network_id, height| { + let certificate = Certificate::new_for_test(network_id, height); + Ok(Some(certificate)) + }); - if let Some(height) = specific_height { - expectation = expectation.with(always(), eq(network_id), eq(height)); - } else { - expectation = expectation.with(always(), eq(network_id), always()); - } + state + .expect_get_latest_settled_certificate_per_network() + .once() + .with(eq(network_id)) + .returning(|_| Ok(None)); - expectation.returning(move |mut new_state, network, height| { - let certificate = pending_store - .get_certificate(network, height) - .expect("Failed to get certificate") - .expect("Certificate not found"); - - let signer = agglayer_types::Address::new([0; 20]); - let ctx_from_l1 = L1WitnessCtx { - l1_info_root: certificate - .l1_info_root() - .expect("Failed to get L1 info root") - .unwrap_or_default(), - prev_pessimistic_root: PessimisticRootInput::Computed( - PessimisticRootCommitmentVersion::V2, - ), - aggchain_data_ctx: CertificateAggchainDataCtx::LegacyEcdsa { signer }, - }; - - let _ = new_state - .apply_certificate(&certificate, ctx_from_l1) - .expect("Failed to apply certificate"); - - Ok(CertifierOutput { - certificate, - height, - new_state, - network, - new_pp_root: Digest::ZERO, - }) - }); -} + state + .expect_get_certificate_header() + .once() + .with(eq(certificate_id)) + .returning(|certificate_id| { + Ok(Some(agglayer_types::CertificateHeader { + network_id: 1.into(), + height: Height::ZERO, + epoch_number: None, + certificate_index: None, + certificate_id: *certificate_id, + prev_local_exit_root: [1; 32].into(), + new_local_exit_root: [0; 32].into(), + metadata: Metadata::ZERO, + status: CertificateStatus::Pending, + settlement_tx_hash: None, + })) + }); + + certifier + .expect_certify() + .once() + .with(always(), eq(network_id), eq(Height::ZERO)) + .return_once(move |new_state, network_id, _height| { + let result = crate::CertifierOutput { + certificate, + height: Height::ZERO, + new_state, + network: network_id, + new_pp_root: Digest::ZERO, + }; + + Ok(result) + }); + + state + .expect_read_local_network_state() + .returning(|_| Ok(Default::default())); + + state + .expect_write_local_network_state() + .returning(|_, _, _| Ok(())); + + pending + .expect_set_latest_proven_certificate_per_network() + .once() + .with(eq(network_id), eq(Height::ZERO), eq(certificate_id)) + .returning(|_, _, _| Ok(())); + + state + .expect_update_certificate_header_status() + .once() + .with(eq(certificate_id), eq(CertificateStatus::Proven)) + .returning(|_, _| Ok(())); -fn setup_settlement_mock( - settlement_client: &mut MockSettlementClient, - certificate_id: CertificateId, - settlement_hash: SettlementTxHash, - nonce: u64, - epoch: EpochNumber, - index: CertificateIndex, -) { settlement_client .expect_submit_certificate_settlement() .once() .withf(move |i, _| *i == certificate_id) - .returning(move |_, _| Ok(settlement_hash)); + .returning(move |_, _| Ok(SettlementTxHash::for_tests())); settlement_client .expect_fetch_settlement_nonce() .once() - .with(eq(settlement_hash)) - .returning(move |_| { + .with(eq(SettlementTxHash::for_tests())) + .returning(|_| { Ok(Some(NonceInfo { - nonce, + nonce: 1, previous_max_fee_per_gas: 0, previous_max_priority_fee_per_gas: None, })) }); + state + .expect_update_settlement_tx_hash() + .once() + .withf(move |i, t, _f, _| *i == certificate_id && *t == SettlementTxHash::for_tests()) + .returning(|_, _, _, _| Ok(())); + settlement_client .expect_wait_for_settlement() .once() - .withf(move |t, i| *t == settlement_hash && *i == certificate_id) - .returning(move |_, _| Ok((epoch, index))); + .withf(move |t, i| *t == SettlementTxHash::for_tests() && *i == certificate_id) + .returning(move |_, _| Ok((EpochNumber::ZERO, CertificateIndex::ZERO))); + + state + .expect_update_certificate_header_status() + .once() + .with(eq(certificate_id), eq(CertificateStatus::Settled)) + .returning(|_, _| Ok(())); + + state + .expect_set_latest_settled_certificate_for_network() + .once() + .with( + eq(network_id), + eq(Height::ZERO), + eq(certificate_id), + eq(EpochNumber::ZERO), + eq(CertificateIndex::ZERO), + ) + .returning(|_, _, _, _, _| Ok(())); + + let mut task = NetworkTask::new( + Arc::new(pending), + Arc::new(state), + Arc::new(certifier), + Arc::new(settlement_client), + clock_ref, + network_id, + certificate_stream, + ) + .expect("Failed to create a new network task"); + + let mut epochs = task.clock_ref.subscribe().unwrap(); + let mut next_expected_height = Height::ZERO; + + let _ = sender + .send(NewCertificate { + certificate_id, + height: Height::ZERO, + }) + .await; + + let mut first_run = true; + task.make_progress( + &mut epochs, + &mut next_expected_height, + &mut first_run, + &CancellationToken::new(), + ) + .await + .unwrap(); + + assert_eq!(next_expected_height, Height::new(1)); } #[rstest] #[tokio::test] #[timeout(Duration::from_secs(1))] -async fn start_from_zero() { +async fn one_per_epoch() { let mut pending = MockPendingStore::new(); let mut state = MockStateStore::new(); let mut certifier = MockCertifier::new(); let mut settlement_client = MockSettlementClient::new(); let clock_ref = clock(); let network_id = 1.into(); - let (sender, certificate_stream) = mpsc::channel(1); + let (sender, certificate_stream) = mpsc::channel(100); let certificate = Certificate::new_for_test(network_id, Height::ZERO); + let certificate2 = Certificate::new_for_test(network_id, Height::new(1)); let certificate_id = certificate.hash(); + let certificate_id2 = certificate2.hash(); pending .expect_get_certificate() .once() .with(eq(network_id), eq(Height::ZERO)) .returning(|network_id, height| { - let certificate = Certificate::new_for_test(network_id, height); - Ok(Some(certificate)) + let c = Certificate::new_for_test(network_id, height); + + Ok(Some(c)) + }); + + pending + .expect_get_certificate() + .never() + .with(eq(network_id), eq(Height::new(1))) + .returning(|network_id, height| { + let c = Certificate::new_for_test(network_id, height); + + Ok(Some(c)) }); state @@ -178,6 +257,24 @@ async fn start_from_zero() { })) }); + state + .expect_get_certificate_header() + .never() + .with(eq(certificate_id2)) + .returning(|certificate_id| { + Ok(Some(agglayer_types::CertificateHeader { + network_id: 1.into(), + height: Height::new(1), + epoch_number: None, + certificate_index: None, + certificate_id: *certificate_id, + prev_local_exit_root: [1; 32].into(), + new_local_exit_root: [0; 32].into(), + metadata: Metadata::ZERO, + status: CertificateStatus::Pending, + settlement_tx_hash: None, + })) + }); certifier .expect_certify() .once() @@ -202,6 +299,22 @@ async fn start_from_zero() { .expect_write_local_network_state() .returning(|_, _, _| Ok(())); + certifier + .expect_certify() + .never() + .with(always(), eq(network_id), eq(Height::new(1))) + .return_once(move |new_state, network_id, _height| { + let result = crate::CertifierOutput { + certificate: certificate2, + height: Height::new(1), + new_state, + network: network_id, + new_pp_root: Digest::ZERO, + }; + + Ok(result) + }); + pending .expect_set_latest_proven_certificate_per_network() .once() @@ -273,17 +386,28 @@ async fn start_from_zero() { ) .expect("Failed to create a new network task"); + let mut epochs = task.clock_ref.subscribe().unwrap(); let mut next_expected_height = Height::ZERO; - let _ = sender + sender .send(NewCertificate { certificate_id, height: Height::ZERO, }) - .await; + .await + .expect("Failed to send the certificate"); + + sender + .send(NewCertificate { + certificate_id: certificate_id2, + height: Height::new(1), + }) + .await + .expect("Failed to send the certificate"); let mut first_run = true; task.make_progress( + &mut epochs, &mut next_expected_height, &mut first_run, &CancellationToken::new(), @@ -291,6 +415,19 @@ async fn start_from_zero() { .await .unwrap(); + assert_eq!(next_expected_height, Height::new(1)); + tokio::time::timeout( + Duration::from_millis(100), + task.make_progress( + &mut epochs, + &mut next_expected_height, + &mut first_run, + &CancellationToken::new(), + ), + ) + .await + .expect_err("Should have timed out"); + assert_eq!(next_expected_height, Height::new(1)); } @@ -539,6 +676,7 @@ async fn retries() { ) .expect("Failed to create a new network task"); + let mut epochs = task.clock_ref.subscribe().unwrap(); let mut next_expected_height = Height::ZERO; sender @@ -559,6 +697,7 @@ async fn retries() { let mut first_run = true; task.make_progress( + &mut epochs, &mut next_expected_height, &mut first_run, &CancellationToken::new(), @@ -569,6 +708,7 @@ async fn retries() { assert_eq!(next_expected_height, Height::ZERO); task.make_progress( + &mut epochs, &mut next_expected_height, &mut first_run, &CancellationToken::new(), @@ -580,18 +720,21 @@ async fn retries() { } #[rstest] -#[tokio::test] +#[test_log::test(tokio::test)] #[timeout(Duration::from_secs(1))] -async fn timeout_certifier() { +async fn changing_epoch_triggers_certify() { let mut pending = MockPendingStore::new(); let mut state = MockStateStore::new(); let mut certifier = MockCertifier::new(); + let mut settlement_client = MockSettlementClient::new(); let clock_ref = clock(); let network_id = 1.into(); let (sender, certificate_stream) = mpsc::channel(100); let certificate = Certificate::new_for_test(network_id, Height::ZERO); + let certificate2 = Certificate::new_for_test(network_id, Height::new(1)); let certificate_id = certificate.hash(); + let certificate_id2 = certificate2.hash(); pending .expect_get_certificate() @@ -599,6 +742,26 @@ async fn timeout_certifier() { .with(eq(network_id), eq(Height::ZERO)) .returning(|network_id, height| Ok(Some(Certificate::new_for_test(network_id, height)))); + pending + .expect_get_certificate() + .once() + .with(eq(network_id), eq(Height::new(1))) + .returning(|network_id, height| Ok(Some(Certificate::new_for_test(network_id, height)))); + + state + .expect_read_local_network_state() + .returning(|_| Ok(Default::default())); + + state + .expect_write_local_network_state() + .returning(|_, _, _| Ok(())); + + state + .expect_get_latest_settled_certificate_per_network() + .once() + .with(eq(network_id)) + .returning(|_| Ok(None)); + state .expect_get_certificate_header() .once() @@ -610,150 +773,188 @@ async fn timeout_certifier() { epoch_number: None, certificate_index: None, certificate_id: *certificate_id, - prev_local_exit_root: [1; 32].into(), new_local_exit_root: [0; 32].into(), + prev_local_exit_root: [1; 32].into(), metadata: Metadata::ZERO, status: CertificateStatus::Pending, settlement_tx_hash: None, })) }); - certifier - .expect_certify() + state + .expect_get_certificate_header() .once() - .with(always(), eq(network_id), eq(Height::ZERO)) - .return_once(move |_new_state, _network_id, _height| { - Err(CertificationError::InternalError("TimedOut".to_string())) + .with(eq(certificate_id2)) + .returning(|certificate_id| { + Ok(Some(agglayer_types::CertificateHeader { + network_id: 1.into(), + height: Height::new(1), + epoch_number: None, + certificate_index: None, + certificate_id: *certificate_id, + prev_local_exit_root: [1; 32].into(), + new_local_exit_root: [0; 32].into(), + metadata: Metadata::ZERO, + status: CertificateStatus::Pending, + settlement_tx_hash: None, + })) }); - let expected_error = String::from("Internal error: TimedOut"); + certifier + .expect_certify() + .once() + .with(always(), eq(network_id), eq(Height::ZERO)) + .return_once(move |new_state, network_id, _height| { + let result = crate::CertifierOutput { + certificate, + height: Height::ZERO, + new_state, + network: network_id, + new_pp_root: Digest::ZERO, + }; - state - .expect_get_latest_settled_certificate_per_network() + Ok(result) + }); + + certifier + .expect_certify() .once() - .with(eq(network_id)) - .returning(|_| Ok(None)); + .with(always(), eq(network_id), eq(Height::new(1))) + .return_once(move |new_state, network_id, _height| { + let result = crate::CertifierOutput { + certificate: certificate2, + height: Height::new(1), + new_state, + network: network_id, + new_pp_root: Digest::ZERO, + }; + + Ok(result) + }); + + pending + .expect_set_latest_proven_certificate_per_network() + .once() + .with(eq(network_id), eq(Height::ZERO), eq(certificate_id)) + .returning(|_, _, _| Ok(())); + + pending + .expect_set_latest_proven_certificate_per_network() + .once() + .with(eq(network_id), eq(Height::new(1)), eq(certificate_id2)) + .returning(|_, _, _| Ok(())); state .expect_update_certificate_header_status() .once() - .withf(move |id, status| { - if *id != certificate_id { - return false; - } - let CertificateStatus::InError { error } = status else { - return false; - }; - let CertificateStatusError::InternalError(error) = &**error else { - return false; - }; - error.starts_with(&expected_error) - }) + .with(eq(certificate_id), eq(CertificateStatus::Proven)) .returning(|_, _| Ok(())); state - .expect_read_local_network_state() - .returning(|_| Ok(Default::default())); - - let mut task = NetworkTask::new( - Arc::new(pending), - Arc::new(state), - Arc::new(certifier), - Arc::new(MockSettlementClient::new()), - clock_ref.clone(), - network_id, - certificate_stream, - ) - .expect("Failed to create a new network task"); + .expect_update_certificate_header_status() + .once() + .with(eq(certificate_id2), eq(CertificateStatus::Proven)) + .returning(|_, _| Ok(())); - let mut next_expected_height = Height::ZERO; + settlement_client + .expect_submit_certificate_settlement() + .once() + .withf(move |i, _| *i == certificate_id) + .returning(move |_, _| Ok(SETTLEMENT_TX_HASH_1)); - sender - .send(NewCertificate { - certificate_id, - height: Height::ZERO, - }) - .await - .expect("Failed to send the certificate"); - let mut first_run = true; - task.make_progress( - &mut next_expected_height, - &mut first_run, - &CancellationToken::new(), - ) - .await - .unwrap(); + settlement_client + .expect_fetch_settlement_nonce() + .once() + .with(eq(SETTLEMENT_TX_HASH_1)) + .returning(|_| { + Ok(Some(NonceInfo { + nonce: 1, + previous_max_fee_per_gas: 0, + previous_max_priority_fee_per_gas: None, + })) + }); - assert_eq!(next_expected_height, Height::ZERO); -} + state + .expect_update_settlement_tx_hash() + .once() + .withf(move |i, t, _f, _| *i == certificate_id && *t == SETTLEMENT_TX_HASH_1) + .returning(|_, _, _, _| Ok(())); -#[rstest] -#[test_log::test(tokio::test)] -#[timeout(Duration::from_secs(2))] -async fn process_next_certificate() { - let tmp = TempDBDir::new(); - let storage = new_storage(&tmp.path); - let mut settlement_client = MockSettlementClient::new(); - let mut certifier = MockCertifier::new(); - let clock_ref = clock(); - let network_id = 1.into(); - let (sender, certificate_stream) = mpsc::channel(100); + settlement_client + .expect_submit_certificate_settlement() + .once() + .withf(move |i, _| *i == certificate_id2) + .returning(move |_, _| Ok(SETTLEMENT_TX_HASH_2)); - let mut forest = Forest::default(); + settlement_client + .expect_fetch_settlement_nonce() + .once() + .with(eq(SETTLEMENT_TX_HASH_2)) + .returning(|_| { + Ok(Some(NonceInfo { + nonce: 2, + previous_max_fee_per_gas: 0, + previous_max_priority_fee_per_gas: None, + })) + }); - let certificate = create_test_certificate(&mut forest, Height::ZERO); - let certificate_id = certificate.hash(); - storage - .pending - .insert_pending_certificate(network_id, Height::ZERO, &certificate) - .expect("unable to insert certificate in pending"); + state + .expect_update_settlement_tx_hash() + .once() + .withf(move |i, t, _f, _| *i == certificate_id2 && *t == SETTLEMENT_TX_HASH_2) + .returning(|_, _, _, _| Ok(())); - storage - .state - .insert_certificate_header(&certificate, CertificateStatus::Pending) - .expect("Failed to insert certificate header"); + settlement_client + .expect_wait_for_settlement() + .once() + .withf(move |t, i| *t == SETTLEMENT_TX_HASH_1 && *i == certificate_id) + .returning(move |_, _| Ok((EpochNumber::ZERO, CertificateIndex::ZERO))); - let certificate2 = create_test_certificate(&mut forest, Height::new(1)); - let certificate_id2 = certificate2.hash(); + state + .expect_update_certificate_header_status() + .once() + .with(eq(certificate_id), eq(CertificateStatus::Settled)) + .returning(|_, _| Ok(())); - storage - .pending - .insert_pending_certificate(network_id, Height::new(1), &certificate2) - .expect("unable to insert certificate in pending"); - storage - .state - .insert_certificate_header(&certificate2, CertificateStatus::Pending) - .expect("Failed to insert certificate header"); + state + .expect_set_latest_settled_certificate_for_network() + .once() + .with( + eq(network_id), + eq(Height::ZERO), + eq(certificate_id), + eq(EpochNumber::ZERO), + eq(CertificateIndex::ZERO), + ) + .returning(|_, _, _, _, _| Ok(())); - setup_certifier_mock( - &mut certifier, - Arc::clone(&storage.pending), - network_id, - 2, - None, - ); + settlement_client + .expect_wait_for_settlement() + .once() + .withf(move |t, i| *t == SETTLEMENT_TX_HASH_2 && *i == certificate_id2) + .returning(move |_, _| Ok((EpochNumber::new(1), CertificateIndex::ZERO))); - setup_settlement_mock( - &mut settlement_client, - certificate_id, - SETTLEMENT_TX_HASH_1, - 1, - EpochNumber::ZERO, - CertificateIndex::ZERO, - ); + state + .expect_update_certificate_header_status() + .once() + .with(eq(certificate_id2), eq(CertificateStatus::Settled)) + .returning(|_, _| Ok(())); - setup_settlement_mock( - &mut settlement_client, - certificate_id2, - SETTLEMENT_TX_HASH_2, - 2, - EpochNumber::new(1), - CertificateIndex::ZERO, - ); + state + .expect_set_latest_settled_certificate_for_network() + .once() + .with( + eq(network_id), + eq(Height::new(1)), + eq(certificate_id2), + eq(EpochNumber::new(1)), + eq(CertificateIndex::ZERO), + ) + .returning(|_, _, _, _, _| Ok(())); let mut task = NetworkTask::new( - Arc::clone(&storage.pending), - Arc::clone(&storage.state), + Arc::new(pending), + Arc::new(state), Arc::new(certifier), Arc::new(settlement_client), clock_ref.clone(), @@ -762,17 +963,16 @@ async fn process_next_certificate() { ) .expect("Failed to create a new network task"); + let mut epochs = task.clock_ref.subscribe().unwrap(); let mut next_expected_height = Height::ZERO; - let mut first_run = false; // Set to false since we're testing certificate processing, not initialization - // Send both certificate events sender .send(NewCertificate { certificate_id, height: Height::ZERO, }) .await - .expect("Failed to send first certificate"); + .expect("Failed to send the certificate"); sender .send(NewCertificate { @@ -780,10 +980,10 @@ async fn process_next_certificate() { height: Height::new(1), }) .await - .expect("Failed to send second certificate"); - - // Process first certificate + .expect("Failed to send the certificate"); + let mut first_run = true; task.make_progress( + &mut epochs, &mut next_expected_height, &mut first_run, &CancellationToken::new(), @@ -793,11 +993,29 @@ async fn process_next_certificate() { assert_eq!(next_expected_height, Height::new(1)); - // Update clock for epoch transition + tokio::time::timeout( + Duration::from_millis(100), + task.make_progress( + &mut epochs, + &mut next_expected_height, + &mut first_run, + &CancellationToken::new(), + ), + ) + .await + .expect_err("Should have timed out"); + + assert_eq!(next_expected_height, Height::new(1)); + clock_ref.update_block_height(2); - // Process second certificate + clock_ref + .get_sender() + .send(agglayer_clock::Event::EpochEnded(EpochNumber::ZERO)) + .expect("Failed to send"); + let mut first_run = true; task.make_progress( + &mut epochs, &mut next_expected_height, &mut first_run, &CancellationToken::new(), @@ -805,277 +1023,250 @@ async fn process_next_certificate() { .await .unwrap(); - assert_eq!(next_expected_height, Height::new(2)); -} - -#[rstest] -#[test_log::test(tokio::test)] -#[timeout(Duration::from_secs(2))] -async fn multiple_certificates_per_epoch_sequential() { - let tmp = TempDBDir::new(); - let storage = new_storage(&tmp.path); - let mut settlement_client = MockSettlementClient::new(); - let mut certifier = MockCertifier::new(); - let clock_ref = clock(); - let network_id = 1.into(); - let (sender, certificate_stream) = mpsc::channel(100); - - let num_certificates = 5; - let mut forest = Forest::default(); - let mut certificate_ids = Vec::new(); - - // Create and store multiple certificates - for i in 0..num_certificates { - let certificate = create_test_certificate(&mut forest, Height::new(i)); - let certificate_id = certificate.hash(); - certificate_ids.push(certificate_id); - - storage - .pending - .insert_pending_certificate(network_id, Height::new(i), &certificate) - .expect("unable to insert certificate in pending"); - - storage - .state - .insert_certificate_header(&certificate, CertificateStatus::Pending) - .expect("Failed to insert certificate header"); - } - - // Mock certifier to prove ALL certificates - setup_certifier_mock( - &mut certifier, - Arc::clone(&storage.pending), - network_id, - num_certificates as usize, - None, - ); - - // Mock settlement for ALL certificates (now that at_capacity_for_epoch is - // removed) - for i in 0..num_certificates { - let settlement_hash = SettlementTxHash::new(Digest([i as u8; 32])); - setup_settlement_mock( - &mut settlement_client, - certificate_ids[i as usize], - settlement_hash, - i + 1, - EpochNumber::ZERO, - CertificateIndex::new(i), - ); - } - - let mut task = NetworkTask::new( - Arc::clone(&storage.pending), - Arc::clone(&storage.state), - Arc::new(certifier), - Arc::new(settlement_client), - clock_ref.clone(), - network_id, - certificate_stream, - ) - .expect("Failed to create a new network task"); - - let mut next_expected_height = Height::ZERO; - - // Send all certificates - for i in 0..num_certificates { - sender - .send(NewCertificate { - certificate_id: certificate_ids[i as usize], - height: Height::new(i), - }) - .await - .expect("Failed to send the certificate"); - } - - // Process all certificates - they should all be proven and settled - let mut first_run = false; // Set to false to process certificates immediately - for i in 0..num_certificates { - task.make_progress( - &mut next_expected_height, - &mut first_run, - &CancellationToken::new(), - ) - .await - .unwrap(); - - // After settling each certificate, next_expected_height should increment - assert_eq!(next_expected_height, Height::new(i + 1)); - - // Verify certificate is settled - let header = storage - .state - .get_certificate_header(&certificate_ids[i as usize]) - .expect("Failed to get certificate header") - .expect("Certificate header not found"); - - assert_eq!( - header.status, - CertificateStatus::Settled, - "Certificate {} should be settled", - i - ); - } - - // This test demonstrates that multiple certificates can be processed and - // settled sequentially in the same epoch (at_capacity_for_epoch removed) + assert_eq!(next_expected_height, Height::new(2)); } #[rstest] #[tokio::test] #[timeout(Duration::from_secs(1))] -async fn reject_non_sequential_certificates() { +async fn timeout_certifier() { let mut pending = MockPendingStore::new(); let mut state = MockStateStore::new(); - let certifier = MockCertifier::new(); - let settlement_client = MockSettlementClient::new(); + let mut certifier = MockCertifier::new(); let clock_ref = clock(); let network_id = 1.into(); let (sender, certificate_stream) = mpsc::channel(100); - let certificate_height_2 = Certificate::new_for_test(network_id, Height::new(2)); - let certificate_id_2 = certificate_height_2.hash(); + let certificate = Certificate::new_for_test(network_id, Height::ZERO); + let certificate_id = certificate.hash(); + + pending + .expect_get_certificate() + .once() + .with(eq(network_id), eq(Height::ZERO)) + .returning(|network_id, height| Ok(Some(Certificate::new_for_test(network_id, height)))); + + state + .expect_get_certificate_header() + .once() + .with(eq(certificate_id)) + .returning(|certificate_id| { + Ok(Some(agglayer_types::CertificateHeader { + network_id: 1.into(), + height: Height::ZERO, + epoch_number: None, + certificate_index: None, + certificate_id: *certificate_id, + prev_local_exit_root: [1; 32].into(), + new_local_exit_root: [0; 32].into(), + metadata: Metadata::ZERO, + status: CertificateStatus::Pending, + settlement_tx_hash: None, + })) + }); + + certifier + .expect_certify() + .once() + .with(always(), eq(network_id), eq(Height::ZERO)) + .return_once(move |_new_state, _network_id, _height| { + Err(CertificationError::InternalError("TimedOut".to_string())) + }); + + let expected_error = String::from("Internal error: TimedOut"); state .expect_get_latest_settled_certificate_per_network() .once() .with(eq(network_id)) - .returning(|network_id| { - // Network last settled at height 0 - Ok(Some(( - *network_id, - agglayer_storage::columns::latest_settled_certificate_per_network::SettledCertificate( - CertificateId::default(), - Height::ZERO, - EpochNumber::ZERO, - CertificateIndex::ZERO, - ), - ))) - }); + .returning(|_| Ok(None)); + + state + .expect_update_certificate_header_status() + .once() + .withf(move |id, status| { + if *id != certificate_id { + return false; + } + let CertificateStatus::InError { error } = status else { + return false; + }; + let CertificateStatusError::InternalError(error) = &**error else { + return false; + }; + error.starts_with(&expected_error) + }) + .returning(|_, _| Ok(())); state .expect_read_local_network_state() .returning(|_| Ok(Default::default())); - // Certificate at height 2 should never be processed (expecting height 1) - pending - .expect_get_certificate() - .never() - .with(eq(network_id), eq(Height::new(2))) - .returning(|network_id, height| Ok(Some(Certificate::new_for_test(network_id, height)))); - let mut task = NetworkTask::new( Arc::new(pending), Arc::new(state), Arc::new(certifier), - Arc::new(settlement_client), + Arc::new(MockSettlementClient::new()), clock_ref.clone(), network_id, certificate_stream, ) .expect("Failed to create a new network task"); - let mut next_expected_height = Height::new(1); // Expecting height 1 after settling height 0 + let mut epochs = task.clock_ref.subscribe().unwrap(); + let mut next_expected_height = Height::ZERO; - // Try to send certificate at height 2 (skipping height 1) sender .send(NewCertificate { - certificate_id: certificate_id_2, - height: Height::new(2), + certificate_id, + height: Height::ZERO, }) .await .expect("Failed to send the certificate"); + let mut first_run = true; + task.make_progress( + &mut epochs, + &mut next_expected_height, + &mut first_run, + &CancellationToken::new(), + ) + .await + .unwrap(); - // The certificate at height 2 should be rejected because next_expected_height - // is 1 When make_progress receives a certificate with wrong height, it logs - // a warning and returns Ok(()) without processing it - - let mut first_run = false; - - // This should complete immediately and return Ok(()) because the certificate - // height doesn't match next_expected_height - let result = task - .make_progress( - &mut next_expected_height, - &mut first_run, - &CancellationToken::new(), - ) - .await; - - // Should return Ok(()) after rejecting the wrong-height certificate - assert!( - result.is_ok(), - "Should return Ok after rejecting wrong-height certificate" - ); - - // next_expected_height should remain unchanged - assert_eq!(next_expected_height, Height::new(1)); + assert_eq!(next_expected_height, Height::ZERO); } #[rstest] -#[tokio::test] -#[timeout(Duration::from_secs(1))] -async fn accept_sequential_certificates_in_order() { +#[test_log::test(tokio::test)] +#[timeout(Duration::from_secs(2))] +async fn process_next_certificate() { let tmp = TempDBDir::new(); let storage = new_storage(&tmp.path); - let mut certifier = MockCertifier::new(); let mut settlement_client = MockSettlementClient::new(); + + let mut certifier = MockCertifier::new(); let clock_ref = clock(); + let clock_sender = clock_ref.get_sender(); let network_id = 1.into(); let (sender, certificate_stream) = mpsc::channel(100); let mut forest = Forest::default(); - // Create certificates at heights 0 and 1. - let cert0 = create_test_certificate(&mut forest, Height::ZERO); - let cert_id_0 = cert0.hash(); - - let cert1 = create_test_certificate(&mut forest, Height::new(1)); - let cert_id_1 = cert1.hash(); - + let certificate = forest.apply_events( + &[(USDC, 10.try_into().unwrap())], + &[(USDC, 1.try_into().unwrap())], + ); + let certificate_id = certificate.hash(); storage .pending - .insert_pending_certificate(network_id, Height::ZERO, &cert0) - .expect("unable to insert certificate"); + .insert_pending_certificate(network_id, Height::ZERO, &certificate) + .expect("unable to insert certificate in pending"); + storage .state - .insert_certificate_header(&cert0, CertificateStatus::Pending) + .insert_certificate_header(&certificate, CertificateStatus::Pending) .expect("Failed to insert certificate header"); + let mut certificate = forest.apply_events(&[], &[(USDC, 1.try_into().unwrap())]); + certificate.height = Height::new(1); + let certificate_id2 = certificate.hash(); + storage .pending - .insert_pending_certificate(network_id, Height::new(1), &cert1) - .expect("unable to insert certificate"); + .insert_pending_certificate(network_id, Height::new(1), &certificate) + .expect("unable to insert certificate in pending"); storage .state - .insert_certificate_header(&cert1, CertificateStatus::Pending) + .insert_certificate_header(&certificate, CertificateStatus::Pending) .expect("Failed to insert certificate header"); - setup_certifier_mock( - &mut certifier, - Arc::clone(&storage.pending), - network_id, - 2, - None, - ); + let pending_store = storage.pending.clone(); + certifier + .expect_certify() + .times(2) + .with( + always(), + eq(network_id), + in_iter(vec![Height::ZERO, Height::new(1)]), + ) + .returning(move |mut new_state, network, height| { + let certificate = pending_store + .get_certificate(network, height) + .expect("Failed to get certificate") + .expect("Certificate not found"); + + let signer = agglayer_types::Address::new([0; 20]); + let ctx_from_l1 = L1WitnessCtx { + l1_info_root: certificate + .l1_info_root() + .expect("Failed to get L1 info root") + .unwrap_or_default(), + prev_pessimistic_root: PessimisticRootInput::Computed( + PessimisticRootCommitmentVersion::V2, + ), + aggchain_data_ctx: CertificateAggchainDataCtx::LegacyEcdsa { signer }, + }; - setup_settlement_mock( - &mut settlement_client, - cert_id_0, - SETTLEMENT_TX_HASH_1, - 1, - EpochNumber::ZERO, - CertificateIndex::ZERO, - ); + let _ = new_state + .apply_certificate(&certificate, ctx_from_l1) + .expect("Failed to apply certificate"); - setup_settlement_mock( - &mut settlement_client, - cert_id_1, - SETTLEMENT_TX_HASH_2, - 2, - EpochNumber::ZERO, - CertificateIndex::new(1), - ); + Ok(CertifierOutput { + certificate, + height, + new_state, + network, + new_pp_root: Digest::ZERO, + }) + }); + + settlement_client + .expect_submit_certificate_settlement() + .once() + .withf(move |i, _| *i == certificate_id) + .returning(move |_, _| Ok(SETTLEMENT_TX_HASH_1)); + + settlement_client + .expect_fetch_settlement_nonce() + .once() + .with(eq(SETTLEMENT_TX_HASH_1)) + .returning(|_| { + Ok(Some(NonceInfo { + nonce: 1, + previous_max_fee_per_gas: 0, + previous_max_priority_fee_per_gas: None, + })) + }); + + settlement_client + .expect_submit_certificate_settlement() + .once() + .withf(move |i, _| *i == certificate_id2) + .returning(move |_, _| Ok(SETTLEMENT_TX_HASH_2)); + + settlement_client + .expect_fetch_settlement_nonce() + .once() + .with(eq(SETTLEMENT_TX_HASH_2)) + .returning(|_| { + Ok(Some(NonceInfo { + nonce: 2, + previous_max_fee_per_gas: 0, + previous_max_priority_fee_per_gas: None, + })) + }); + + settlement_client + .expect_wait_for_settlement() + .once() + .withf(move |t, i| *t == SETTLEMENT_TX_HASH_1 && *i == certificate_id) + .returning(move |_, _| Ok((EpochNumber::ZERO, CertificateIndex::ZERO))); + + settlement_client + .expect_wait_for_settlement() + .once() + .withf(move |t, i| *t == SETTLEMENT_TX_HASH_2 && *i == certificate_id2) + .returning(move |_, _| Ok((EpochNumber::new(1), CertificateIndex::ZERO))); let mut task = NetworkTask::new( Arc::clone(&storage.pending), @@ -1088,148 +1279,69 @@ async fn accept_sequential_certificates_in_order() { ) .expect("Failed to create a new network task"); + let mut epochs = task.clock_ref.subscribe().unwrap(); let mut next_expected_height = Height::ZERO; - let mut first_run = false; - - // Send certificate at height 1 first; it should be ignored while height 0 is - // expected. - sender - .send(NewCertificate { - certificate_id: cert_id_1, - height: Height::new(1), - }) - .await - .expect("Failed to send certificate"); - - task.make_progress( - &mut next_expected_height, - &mut first_run, - &CancellationToken::new(), - ) - .await - .expect("Failed to process out-of-order certificate"); - - assert_eq!(next_expected_height, Height::ZERO); - let header_1 = storage - .state - .get_certificate_header(&cert_id_1) - .expect("Failed to get certificate header") - .expect("Certificate header not found"); - assert_eq!(header_1.status, CertificateStatus::Pending); - // Process height 0. sender .send(NewCertificate { - certificate_id: cert_id_0, + certificate_id, height: Height::ZERO, }) .await - .expect("Failed to send certificate"); - + .expect("Failed to send the certificate"); + let mut first_run = true; task.make_progress( + &mut epochs, &mut next_expected_height, &mut first_run, &CancellationToken::new(), ) .await - .expect("Failed to process first certificate"); + .unwrap(); assert_eq!(next_expected_height, Height::new(1)); - let header_0 = storage - .state - .get_certificate_header(&cert_id_0) - .expect("Failed to get certificate header") - .expect("Certificate header not found"); - assert_eq!(header_0.status, CertificateStatus::Settled); - - // Re-send height 1 now that it is expected. - sender - .send(NewCertificate { - certificate_id: cert_id_1, - height: Height::new(1), - }) - .await - .expect("Failed to send certificate"); + clock_ref.update_block_height(2); + _ = clock_sender.send(agglayer_clock::Event::EpochEnded(EpochNumber::ZERO)); task.make_progress( + &mut epochs, &mut next_expected_height, &mut first_run, &CancellationToken::new(), ) .await - .expect("Failed to process second certificate"); - + .unwrap(); assert_eq!(next_expected_height, Height::new(2)); - let header_1 = storage - .state - .get_certificate_header(&cert_id_1) - .expect("Failed to get certificate header") - .expect("Certificate header not found"); - assert_eq!(header_1.status, CertificateStatus::Settled); } #[rstest] #[test_log::test(tokio::test)] -#[timeout(Duration::from_secs(3))] -async fn process_multiple_certificates_across_epochs_from_pending() { - let tmp = TempDBDir::new(); - let storage = new_storage(&tmp.path); - let mut settlement_client = MockSettlementClient::new(); - let mut certifier = MockCertifier::new(); +#[timeout(Duration::from_secs(1))] +async fn epoch_jammed(#[values(false, true)] at_capacity: bool) { + let mut pending = MockPendingStore::new(); + let mut state = MockStateStore::new(); + let certifier = MockCertifier::new(); + let settlement_client = MockSettlementClient::new(); let clock_ref = clock(); + let epoch_sender = clock_ref.get_sender(); let network_id = 1.into(); - let (sender, certificate_stream) = mpsc::channel(100); + let (_sender, certificate_stream) = mpsc::channel(1); - let num_certificates = 5; - let mut forest = Forest::default(); - let mut certificate_ids = Vec::new(); - - // Create and store ALL certificates in pending store upfront - for i in 0..num_certificates { - let certificate = create_test_certificate(&mut forest, Height::new(i)); - let certificate_id = certificate.hash(); - certificate_ids.push(certificate_id); - - storage - .pending - .insert_pending_certificate(network_id, Height::new(i), &certificate) - .expect("unable to insert certificate in pending"); - - storage - .state - .insert_certificate_header(&certificate, CertificateStatus::Pending) - .expect("Failed to insert certificate header"); - } + state + .expect_read_local_network_state() + .returning(|_| Ok(Default::default())); - // Mock certifier to prove ALL certificates - setup_certifier_mock( - &mut certifier, - Arc::clone(&storage.pending), - network_id, - num_certificates as usize, - None, - ); + state + .expect_get_latest_settled_certificate_per_network() + .once() + .with(eq(network_id)) + .returning(|_| Ok(None)); - // Mock settlement for ALL certificates - for i in 0..num_certificates { - let cert_id = certificate_ids[i as usize]; - let settlement_hash = SettlementTxHash::new(Digest([i as u8; 32])); - let epoch = EpochNumber::new(i / 2); // 2 certificates per epoch - let index = CertificateIndex::new(i % 2); - - setup_settlement_mock( - &mut settlement_client, - cert_id, - settlement_hash, - i + 1, - epoch, - index, - ); - } + pending.expect_get_certificate().returning(|_, _| Ok(None)); let mut task = NetworkTask::new( - Arc::clone(&storage.pending), - Arc::clone(&storage.state), + Arc::new(pending), + Arc::new(state), Arc::new(certifier), Arc::new(settlement_client), clock_ref.clone(), @@ -1238,69 +1350,39 @@ async fn process_multiple_certificates_across_epochs_from_pending() { ) .expect("Failed to create a new network task"); + let mut epochs = task.clock_ref.subscribe().unwrap(); let mut next_expected_height = Height::ZERO; - let mut first_run = false; // Set to false to process certificates immediately - - // Process certificates one by one, triggering epoch transitions - for i in 0..num_certificates { - // Send the certificate event - sender - .send(NewCertificate { - certificate_id: certificate_ids[i as usize], - height: Height::new(i), - }) - .await - .expect("Failed to send certificate"); - // Process the certificate - task.make_progress( - &mut next_expected_height, - &mut first_run, - &CancellationToken::new(), - ) - .await - .expect("Failed to process certificate"); - - // Verify certificate is settled - let header = storage - .state - .get_certificate_header(&certificate_ids[i as usize]) - .expect("Failed to get certificate header") - .expect("Certificate header not found"); - assert_eq!( - header.status, - CertificateStatus::Settled, - "Certificate {} should be settled", - i - ); - - // After every 2 certificates, trigger epoch transition - if i > 0 && i % 2 == 1 { - clock_ref.update_block_height((i + 1) * 2); - } - - // Next expected height should increment - assert_eq!(next_expected_height, Height::new(i + 1)); + // Jam the epoch channel with a bunch of epoch events. + for epoch_no in 1..=105 { + epoch_sender + .send(agglayer_clock::Event::EpochEnded(EpochNumber::new( + epoch_no, + ))) + .unwrap(); } - // Verify ALL certificates were settled - for i in 0..num_certificates { - let header = storage - .state - .get_certificate_header(&certificate_ids[i as usize]) - .expect("Failed to get certificate header") - .expect("Certificate header not found"); - assert_eq!( - header.status, - CertificateStatus::Settled, - "Certificate {} should be settled", - i - ); - } + // Just make sure it does not panic or time out when epoch events are skipped. + let mut first_run = false; + task.at_capacity_for_epoch = at_capacity; + task.make_progress( + &mut epochs, + &mut next_expected_height, + &mut first_run, + &CancellationToken::new(), + ) + .await + .unwrap(); + assert_eq!(task.at_capacity_for_epoch, at_capacity); - // This test demonstrates: - // 1. Multiple certificates can be processed sequentially - // 2. Epoch transitions don't block certificate processing - // 3. Certificates are automatically picked from pending store - // 4. All certificates settle successfully across multiple epochs + // Taking the next item from the channel should advance the epoch. + task.make_progress( + &mut epochs, + &mut next_expected_height, + &mut first_run, + &CancellationToken::new(), + ) + .await + .unwrap(); + assert!(!task.at_capacity_for_epoch); } diff --git a/crates/agglayer-certificate-orchestrator/src/network_task/tests/status.rs b/crates/agglayer-certificate-orchestrator/src/network_task/tests/status.rs index 83cb0b222..180baaef7 100644 --- a/crates/agglayer-certificate-orchestrator/src/network_task/tests/status.rs +++ b/crates/agglayer-certificate-orchestrator/src/network_task/tests/status.rs @@ -133,9 +133,11 @@ async fn from_pending_to_settled() { ) .expect("Failed to create a new network task"); + let mut epochs = task.clock_ref.subscribe().unwrap(); let mut next_expected_height = Height::ZERO; let mut first_run = true; task.make_progress( + &mut epochs, &mut next_expected_height, &mut first_run, &CancellationToken::new(), @@ -264,9 +266,11 @@ async fn from_proven_to_settled() { ) .expect("Failed to create a new network task"); + let mut epochs = task.clock_ref.subscribe().unwrap(); let mut next_expected_height = Height::ZERO; let mut first_run = true; task.make_progress( + &mut epochs, &mut next_expected_height, &mut first_run, &CancellationToken::new(), @@ -377,9 +381,11 @@ async fn from_candidate_to_settled() { ) .expect("Failed to create a new network task"); + let mut epochs = task.clock_ref.subscribe().unwrap(); let mut next_expected_height = Height::ZERO; let mut first_run = true; task.make_progress( + &mut epochs, &mut next_expected_height, &mut first_run, &CancellationToken::new(), @@ -442,9 +448,11 @@ async fn from_settled_to_settled() { ) .expect("Failed to create a new network task"); + let mut epochs = task.clock_ref.subscribe().unwrap(); let mut next_expected_height = Height::new(1); let mut first_run = true; task.make_progress( + &mut epochs, &mut next_expected_height, &mut first_run, &CancellationToken::new(), diff --git a/crates/agglayer-storage/src/stores/per_epoch/mod.rs b/crates/agglayer-storage/src/stores/per_epoch/mod.rs index 9c717f09d..f4fde98f5 100644 --- a/crates/agglayer-storage/src/stores/per_epoch/mod.rs +++ b/crates/agglayer-storage/src/stores/per_epoch/mod.rs @@ -35,6 +35,8 @@ mod cf_definitions; #[cfg(test)] mod tests; +const MAX_CERTIFICATE_PER_EPOCH: u64 = 1; + /// A logical store for an Epoch. pub struct PerEpochStore { pub epoch_number: Arc, @@ -325,6 +327,8 @@ where ); // Adding the network to the end checkpoint. end_checkpoint_entry_assignment = Some(Height::ZERO); + + // Adding the certificate to the DB } // If the network is not found in the end checkpoint and the height is not 0, // this is an invalid certificate candidate and the operation should fail. @@ -336,23 +340,18 @@ where (Some(_start_height), Entry::Occupied(ref current_height)) if height == Height::ZERO => { - debug!( - "{}Failed certificate candidate for network {}: height is {} but network is \ - already present in the end checkpoint with height {}", - mode.prefix(), - network_id, - height, - current_height.get() - ); return Err(CertificateCandidateError::UnexpectedHeight( network_id, height, *current_height.get(), - ))?; + ))? } // If the network is found in the end checkpoint and the height minus one is equal to // the current network height. We can add the certificate. - (_, Entry::Occupied(current_height)) if current_height.get().next() == height => { + (Some(start_height), Entry::Occupied(current_height)) + if current_height.get().next() == height + && height.distance_since(start_height) <= MAX_CERTIFICATE_PER_EPOCH => + { debug!( "{}Certificate candidate for network {} at height {} accepted", mode.prefix(), @@ -364,19 +363,11 @@ where } (_, Entry::Occupied(current_height)) => { - debug!( - "{}Failed certificate candidate for network {}: current height is {} \ - submitted certificate height is {}", - mode.prefix(), - network_id, - current_height.get(), - height, - ); return Err(CertificateCandidateError::UnexpectedHeight( network_id, height, *current_height.get(), - ))?; + ))? } } diff --git a/crates/agglayer-storage/src/stores/per_epoch/tests.rs b/crates/agglayer-storage/src/stores/per_epoch/tests.rs index 69d458003..6b41467ab 100644 --- a/crates/agglayer-storage/src/stores/per_epoch/tests.rs +++ b/crates/agglayer-storage/src/stores/per_epoch/tests.rs @@ -179,7 +179,7 @@ fn adding_a_certificate( #[case::when_state_are_empty( StartCheckpointState::Empty, EndCheckpointState::Empty, - VecDeque::from([|result: Result<_, Error>| result.is_ok(), |result: Result<_, Error>| result.is_ok()]), + VecDeque::from([|result: Result<_, Error>| result.is_ok(), |result: Result<_, Error>| result.is_err()]), Height::ZERO)] #[case::when_state_are_empty_and_starting_at_wrong_height( StartCheckpointState::Empty, @@ -189,7 +189,7 @@ fn adding_a_certificate( #[case::when_state_is_already_full( StartCheckpointState::Empty, EndCheckpointState::WithCheckpoint(vec![(NetworkId::new(0), Height::ZERO)]), - VecDeque::from([|result: Result<_, Error>| result.is_ok()]), + VecDeque::from([|result: Result<_, Error>| result.is_err()]), Height::new(1))] #[case::when_state_contains_other_network( StartCheckpointState::Empty, @@ -418,218 +418,3 @@ fn can_retrieve_proof_at_index() { "Should return None for non-existent index" ); } - -#[rstest] -#[case::five_certificates(5)] -#[case::ten_certificates(10)] -#[case::fifty_certificates(50)] -fn can_add_multiple_certificates_per_epoch( - store: PerEpochStore, - #[case] num_certificates: u64, -) { - let pending_store = store.pending_store.clone(); - let state_store = store.state_store.clone(); - let network = NetworkId::new(0); - - // Add multiple certificates sequentially - for i in 0..num_certificates { - let height = Height::new(i); - let certificate = Certificate::new_for_test(network, height); - let certificate_id = certificate.hash(); - - state_store - .insert_certificate_header(&certificate, CertificateStatus::Proven) - .unwrap(); - - pending_store - .insert_pending_certificate(network, height, &certificate) - .unwrap(); - - pending_store - .insert_generated_proof(&certificate_id, &Proof::dummy()) - .unwrap(); - - let result = store.add_certificate(certificate_id, agglayer_types::ExecutionMode::Default); - assert!( - result.is_ok(), - "Failed to add certificate at height {}: {:?}", - i, - result - ); - - let (epoch_number, certificate_index) = result.unwrap(); - assert_eq!(epoch_number, EpochNumber::ZERO); - assert_eq!(certificate_index, CertificateIndex::new(i)); - } - - // Verify all certificates can be retrieved - for i in 0..num_certificates { - let certificate = store - .get_certificate_at_index(CertificateIndex::new(i)) - .unwrap(); - assert!( - certificate.is_some(), - "Certificate at index {} should exist", - i - ); - - let cert = certificate.unwrap(); - assert_eq!(cert.network_id, network); - assert_eq!(cert.height, Height::new(i)); - } - - // Verify end checkpoint is updated correctly - let end_checkpoint = store.get_end_checkpoint(); - assert_eq!( - end_checkpoint.get(&network), - Some(&Height::new(num_certificates - 1)) - ); -} - -#[rstest] -fn multiple_networks_multiple_certificates_per_epoch( - store: PerEpochStore, -) { - let pending_store = store.pending_store.clone(); - let state_store = store.state_store.clone(); - - // Add 10 certificates for 3 different networks - let networks = [NetworkId::new(0), NetworkId::new(1), NetworkId::new(2)]; - let certificates_per_network = 10; - - for network in networks.iter() { - for i in 0..certificates_per_network { - let height = Height::new(i); - let certificate = Certificate::new_for_test(*network, height); - let certificate_id = certificate.hash(); - - state_store - .insert_certificate_header(&certificate, CertificateStatus::Proven) - .unwrap(); - - pending_store - .insert_pending_certificate(*network, height, &certificate) - .unwrap(); - - pending_store - .insert_generated_proof(&certificate_id, &Proof::dummy()) - .unwrap(); - - let result = - store.add_certificate(certificate_id, agglayer_types::ExecutionMode::Default); - assert!( - result.is_ok(), - "Failed to add certificate for network {} at height {}: {:?}", - network, - i, - result - ); - } - } - - // Verify end checkpoints for all networks - let end_checkpoint = store.get_end_checkpoint(); - for network in networks.iter() { - assert_eq!( - end_checkpoint.get(network), - Some(&Height::new(certificates_per_network - 1)), - "Network {} should have end checkpoint at height {}", - network, - certificates_per_network - 1 - ); - } - - // Verify total number of certificates - let mut count = 0; - for i in 0..(networks.len() as u64 * certificates_per_network) { - if store - .get_certificate_at_index(CertificateIndex::new(i)) - .unwrap() - .is_some() - { - count += 1; - } - } - assert_eq!( - count, - networks.len() as u64 * certificates_per_network, - "Should have {} total certificates", - networks.len() * certificates_per_network as usize - ); -} - -#[rstest] -fn sequential_validation_still_enforced(store: PerEpochStore) { - let pending_store = store.pending_store.clone(); - let state_store = store.state_store.clone(); - let network = NetworkId::new(0); - - // Add first certificate at height 0 - let height0 = Height::ZERO; - let certificate0 = Certificate::new_for_test(network, height0); - let certificate_id0 = certificate0.hash(); - - pending_store - .insert_pending_certificate(network, height0, &certificate0) - .unwrap(); - - let height1 = Height::new(1); - let certificate1 = Certificate::new_for_test(network, height1); - let certificate_id1 = certificate1.hash(); - - pending_store - .insert_pending_certificate(network, height1, &certificate1) - .unwrap(); - - let height2 = Height::new(2); - let certificate2 = Certificate::new_for_test(network, height2); - let certificate_id2 = certificate2.hash(); - - pending_store - .insert_pending_certificate(network, height2, &certificate2) - .unwrap(); - - state_store - .insert_certificate_header(&certificate0, CertificateStatus::Proven) - .unwrap(); - - pending_store - .insert_generated_proof(&certificate_id0, &Proof::dummy()) - .unwrap(); - - assert!(store - .add_certificate(certificate_id0, agglayer_types::ExecutionMode::Default) - .is_ok()); - - state_store - .insert_certificate_header(&certificate2, CertificateStatus::Proven) - .unwrap(); - - pending_store - .insert_generated_proof(&certificate_id2, &Proof::dummy()) - .unwrap(); - - let result = store.add_certificate(certificate_id2, agglayer_types::ExecutionMode::Default); - assert!( - result.is_err(), - "Should reject certificate with gap in heights" - ); - assert!(matches!( - result, - Err(Error::CertificateCandidateError( - crate::error::CertificateCandidateError::UnexpectedHeight(_, _, _) - )) - )); - - state_store - .insert_certificate_header(&certificate1, CertificateStatus::Proven) - .unwrap(); - - pending_store - .insert_generated_proof(&certificate_id1, &Proof::dummy()) - .unwrap(); - - assert!(store - .add_certificate(certificate_id1, agglayer_types::ExecutionMode::Default) - .is_ok()); -}