Skip to content

Commit

Permalink
[consensus] Add PeerRoundTracker component which will track highest a…
Browse files Browse the repository at this point in the history
…ccepted and received rounds and calculate quorum rounds.
  • Loading branch information
arun-koshy committed Feb 11, 2025
1 parent c33d3af commit 4852e29
Show file tree
Hide file tree
Showing 11 changed files with 934 additions and 322 deletions.
13 changes: 13 additions & 0 deletions consensus/config/src/parameters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ pub struct Parameters {
#[serde(default = "Parameters::default_propagation_delay_stop_proposal_threshold")]
pub propagation_delay_stop_proposal_threshold: u32,

/// Interval in milliseconds to update quorum rounds of peers.
#[serde(default = "Parameters::default_quorum_round_update_interval_ms")]
pub quorum_round_update_interval_ms: u64,

/// The number of rounds of blocks to be kept in the Dag state cache per authority. The larger
/// the number the more the blocks that will be kept in memory allowing minimising any potential
/// disk access.
Expand Down Expand Up @@ -159,6 +163,14 @@ impl Parameters {
}
}

pub(crate) fn default_quorum_round_update_interval_ms() -> u64 {
if cfg!(msim) {
1000
} else {
5000
}
}

pub(crate) fn default_dag_state_cached_rounds() -> u32 {
if cfg!(msim) {
// Exercise reading blocks from store.
Expand Down Expand Up @@ -202,6 +214,7 @@ impl Default for Parameters {
round_prober_request_timeout_ms: Parameters::default_round_prober_request_timeout_ms(),
propagation_delay_stop_proposal_threshold:
Parameters::default_propagation_delay_stop_proposal_threshold(),
quorum_round_update_interval_ms: Parameters::default_quorum_round_update_interval_ms(),
dag_state_cached_rounds: Parameters::default_dag_state_cached_rounds(),
commit_sync_parallel_fetches: Parameters::default_commit_sync_parallel_fetches(),
commit_sync_batch_size: Parameters::default_commit_sync_batch_size(),
Expand Down
4 changes: 2 additions & 2 deletions consensus/core/src/ancestor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::sync::Arc;
use consensus_config::AuthorityIndex;
use tracing::info;

use crate::{context::Context, leader_scoring::ReputationScores, round_prober::QuorumRound};
use crate::{context::Context, leader_scoring::ReputationScores, round_tracker::QuorumRound};

#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub(crate) enum AncestorState {
Expand Down Expand Up @@ -61,7 +61,7 @@ pub(crate) struct AncestorStateManager {

impl AncestorStateManager {
// Number of quorum round updates for which an ancestor is locked in the EXCLUDE state
// Chose 10 updates as that should be ~50 seconds of waiting with the current round prober
// Chose 10 updates as that should be ~50 seconds of waiting with the current round tracker
// interval of 5s
#[cfg(not(test))]
const STATE_LOCK_QUORUM_ROUND_UPDATES: u32 = 10;
Expand Down
30 changes: 26 additions & 4 deletions consensus/core/src/authority_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::{
NetworkManager,
},
round_prober::{RoundProber, RoundProberHandle},
round_tracker::{PeerRoundTracker, QuorumRoundManager, QuorumRoundManagerHandle},
storage::rocksdb_store::RocksDBStore,
subscriber::Subscriber,
synchronizer::{Synchronizer, SynchronizerHandle},
Expand Down Expand Up @@ -157,6 +158,7 @@ where
subscriber: Option<Subscriber<N::Client, AuthorityService<ChannelCoreThreadDispatcher>>>,
network_manager: N,
sync_last_known_own_block: bool,
quorum_round_manager_handle: QuorumRoundManagerHandle,
}

impl<N> AuthorityNode<N>
Expand Down Expand Up @@ -264,6 +266,8 @@ where
leader_schedule.clone(),
);

let round_tracker = Arc::new(RwLock::new(PeerRoundTracker::new(context.clone())));

let core = Core::new(
context.clone(),
leader_schedule,
Expand All @@ -278,6 +282,7 @@ where
protocol_keypair,
dag_state.clone(),
sync_last_known_own_block,
round_tracker.clone(),
);

let (core_dispatcher, core_thread_handle) =
Expand Down Expand Up @@ -316,13 +321,21 @@ where
core_dispatcher.clone(),
dag_state.clone(),
network_client.clone(),
round_tracker.clone(),
)
.start(),
)
} else {
None
};

let quorum_round_manager_handle = QuorumRoundManager::new(
context.clone(),
core_dispatcher.clone(),
round_tracker.clone(),
)
.start();

let network_service = Arc::new(AuthorityService::new(
context.clone(),
block_verifier,
Expand Down Expand Up @@ -372,6 +385,7 @@ where
subscriber,
network_manager,
sync_last_known_own_block,
quorum_round_manager_handle,
}
}

Expand Down Expand Up @@ -413,6 +427,8 @@ where
.node_metrics
.uptime
.observe(self.start_time.elapsed().as_secs_f64());

self.quorum_round_manager_handle.stop().await;
}

pub(crate) fn transaction_client(&self) -> Arc<TransactionClient> {
Expand All @@ -428,8 +444,11 @@ where
mod tests {
#![allow(non_snake_case)]

use std::collections::BTreeMap;
use std::{collections::BTreeSet, sync::Arc, time::Duration};
use std::{
collections::{BTreeMap, BTreeSet},
sync::Arc,
time::Duration,
};

use consensus_config::{local_committee_and_keys, Parameters};
use mysten_metrics::monitored_mpsc::UnboundedReceiver;
Expand All @@ -441,8 +460,11 @@ mod tests {
use typed_store::DBMetrics;

use super::*;
use crate::block::GENESIS_ROUND;
use crate::{block::BlockAPI as _, transaction::NoopTransactionVerifier, CommittedSubDag};
use crate::{
block::{BlockAPI as _, GENESIS_ROUND},
transaction::NoopTransactionVerifier,
CommittedSubDag,
};

#[rstest]
#[tokio::test]
Expand Down
38 changes: 26 additions & 12 deletions consensus/core/src/authority_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {

let missing_ancestors = self
.core_dispatcher
.add_blocks(vec![verified_block])
.add_blocks(vec![verified_block.clone()])
.await
.map_err(|_| ConsensusError::Shutdown)?;
if !missing_ancestors.is_empty() {
Expand Down Expand Up @@ -245,6 +245,14 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
excluded_ancestors.truncate(excluded_ancestors_limit);
}

self.core_dispatcher
.set_peer_accepted_rounds_from_block(ExtendedBlock {
block: verified_block,
excluded_ancestors: excluded_ancestors.clone(),
})
.await
.map_err(|_| ConsensusError::Shutdown)?;

self.context
.metrics
.node_metrics
Expand Down Expand Up @@ -682,31 +690,30 @@ async fn make_recv_future<T: Clone>(

#[cfg(test)]
mod tests {
use std::{collections::BTreeSet, sync::Arc, time::Duration};

use async_trait::async_trait;
use bytes::Bytes;
use consensus_config::AuthorityIndex;
use parking_lot::{Mutex, RwLock};
use tokio::{sync::broadcast, time::sleep};

use crate::{
authority_service::AuthorityService,
block::{BlockAPI, BlockRef, SignedBlock, TestBlock, VerifiedBlock},
block::{BlockAPI, BlockRef, ExtendedBlock, SignedBlock, TestBlock, VerifiedBlock},
commit::CommitRange,
commit_vote_monitor::CommitVoteMonitor,
context::Context,
core_thread::{CoreError, CoreThreadDispatcher},
dag_state::DagState,
error::ConsensusResult,
network::{BlockStream, ExtendedSerializedBlock, NetworkClient, NetworkService},
round_prober::QuorumRound,
round_tracker::QuorumRound,
storage::mem_store::MemStore,
synchronizer::Synchronizer,
test_dag_builder::DagBuilder,
Round,
};
use async_trait::async_trait;
use bytes::Bytes;
use consensus_config::AuthorityIndex;
use parking_lot::{Mutex, RwLock};
use std::collections::BTreeSet;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::broadcast;
use tokio::time::sleep;

struct FakeCoreThreadDispatcher {
blocks: Mutex<Vec<VerifiedBlock>>,
Expand Down Expand Up @@ -750,6 +757,13 @@ mod tests {
Ok(Default::default())
}

async fn set_peer_accepted_rounds_from_block(
&self,
_extended_block: ExtendedBlock,
) -> Result<(), CoreError> {
todo!()
}

fn set_subscriber_exists(&self, _exists: bool) -> Result<(), CoreError> {
todo!()
}
Expand Down
Loading

0 comments on commit 4852e29

Please sign in to comment.