Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 46 additions & 4 deletions crates/agglayer-certificate-orchestrator/src/network_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use agglayer_types::{
use pessimistic_proof::{
core::commitment::PessimisticRootCommitmentVersion, local_state::StateCommitment,
};
use tokio::sync::{mpsc, oneshot};
use tokio::sync::{broadcast, mpsc, oneshot};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, instrument, warn};

Expand Down Expand Up @@ -137,11 +137,14 @@ pub(crate) struct NetworkTask<CertifierClient, SettlementClient, PendingStore, S
settlement_client: Arc<SettlementClient>,
/// The local network state of the network task.
local_state: Box<LocalNetworkStateData>,

/// The clock reference to subscribe to the epoch events and check for
/// current epoch.
clock_ref: ClockRef,
/// The stream of new certificates to certify.
certificate_stream: mpsc::Receiver<NewCertificate>,
/// Flag to indicate if the network is at capacity for the current epoch.
at_capacity_for_epoch: bool,
/// latest certificate settled
latest_settled: Option<SettledCertificate>,
}
Expand Down Expand Up @@ -190,6 +193,7 @@ where
local_state,
clock_ref,
certificate_stream,
at_capacity_for_epoch: false,
latest_settled,
settlement_client,
})
Expand All @@ -208,6 +212,8 @@ where
) -> Result<NetworkId, Error> {
info!("Starting the network task for network {}", self.network_id);

let mut stream_epoch = self.clock_ref.subscribe()?;

let current_epoch = self.clock_ref.current_epoch();

// Start from the latest settled certificate to define the next expected height
Expand All @@ -221,6 +227,7 @@ where
debug!("Current network height is {}", current_height);
if epoch == current_epoch {
debug!("Already settled for the epoch {current_epoch}");
self.at_capacity_for_epoch = true;
}

current_height.next()
Expand All @@ -239,7 +246,7 @@ where
return Ok(self.network_id);
}

result = self.make_progress(&mut next_expected_height, &mut first_run, &cancellation_token) => {
result = self.make_progress(&mut stream_epoch, &mut next_expected_height, &mut first_run, &cancellation_token) => {
if let Err(error)= result {
error!("Error during the certification process: {}", error);

Expand All @@ -253,9 +260,10 @@ where
}
}

#[instrument(skip(self, cancellation_token), fields(certificate_id))]
#[instrument(skip(self, stream_epoch, cancellation_token), fields(certificate_id))]
async fn make_progress(
&mut self,
stream_epoch: &mut tokio::sync::broadcast::Receiver<agglayer_clock::Event>,
next_expected_height: &mut Height,
first_run: &mut bool,
cancellation_token: &CancellationToken,
Expand All @@ -264,7 +272,39 @@ where
*first_run = false;
} else {
tokio::select! {
Some(NewCertificate { certificate_id, height, .. }) = self.certificate_stream.recv() => {
event = stream_epoch.recv() => {
let network_id = self.network_id;
match event {
Ok(agglayer_clock::Event::EpochEnded(epoch)) => {
info!("Received an epoch event: {}", epoch);

let current_epoch = self.clock_ref.current_epoch();
if epoch != EpochNumber::ZERO && epoch.next() < current_epoch {
warn!("Received an epoch event for epoch {epoch} which is outdated, current epoch is {current_epoch}");

return Ok(());
}
match self.latest_settled {
Some(SettledCertificate(_, _, epoch, _)) if epoch == current_epoch => {
warn!("Network {network_id} is at capacity for the epoch {current_epoch}");
return Ok(());
},
_ => {
self.at_capacity_for_epoch = false;
}
}
}
Err(broadcast::error::RecvError::Lagged(num_skipped)) => {
warn!("Network {network_id} skipped {num_skipped} epoch ticks");
return Ok(());
}
Err(broadcast::error::RecvError::Closed) => {
error!("Epoch channel closed for network {network_id}");
return Err(Error::InternalError("epoch channel closed".into()));
}
}
}
Some(NewCertificate { certificate_id, height, .. }) = self.certificate_stream.recv(), if !self.at_capacity_for_epoch => {
info!(
hash = certificate_id.to_string(),
"Received a certificate event for {certificate_id} at height {height}"
Expand Down Expand Up @@ -505,6 +545,7 @@ where
continue;
}
Some(NetworkTaskMessage::CertificateSettled { settled_certificate, height, .. }) => {
self.at_capacity_for_epoch = true;
let epoch_number = settled_certificate.2;
let certificate_index = settled_certificate.3;
self.latest_settled = Some(settled_certificate);
Expand Down Expand Up @@ -547,6 +588,7 @@ where
}
Some(NetworkTaskMessage::CertificateErrored { .. }) => {
// The certificate task already logged everything that should be logged.
self.at_capacity_for_epoch = false;
break;
}
Some(NetworkTaskMessage::CheckSettlementTx { settlement_tx_hash, tx_mined_notifier, .. }) => {
Expand Down
Loading
Loading