diff --git a/omnipaxos/src/omni_paxos.rs b/omnipaxos/src/omni_paxos.rs index f98c9ba7..47d925fc 100644 --- a/omnipaxos/src/omni_paxos.rs +++ b/omnipaxos/src/omni_paxos.rs @@ -1,14 +1,14 @@ use crate::{ ballot_leader_election::{Ballot, BallotLeaderElection}, errors::{valid_config, ConfigError}, - messages::Message, + messages::{ballot_leader_election::BLEMessage, sequence_paxos::PaxosMessage, Message}, sequence_paxos::{Phase, SequencePaxos}, storage::{Entry, StopSign, Storage}, util::{ defaults::{BUFFER_SIZE, ELECTION_TIMEOUT, FLUSH_BATCH_TIMEOUT, RESEND_MESSAGE_TIMEOUT}, ConfigurationId, FlexibleQuorum, LogEntry, LogicalClock, NodeId, }, - utils::{ui, ui::ClusterState}, + utils::ui::{self, ClusterState}, }; #[cfg(any(feature = "toml_config", feature = "serde"))] use serde::Deserialize; @@ -20,6 +20,7 @@ use std::{ error::Error, fmt::{Debug, Display}, ops::RangeBounds, + sync::RwLock, }; #[cfg(feature = "toml_config")] use toml; @@ -69,13 +70,16 @@ impl OmniPaxosConfig { .get_promise() .expect("storage error while trying to read promise"); Ok(OmniPaxos { - ble: BallotLeaderElection::with(self.clone().into(), recovered_leader), + ble: RwLock::new(BallotLeaderElection::with( + self.clone().into(), + recovered_leader, + )), election_clock: LogicalClock::with(self.server_config.election_tick_timeout), resend_message_clock: LogicalClock::with( self.server_config.resend_message_tick_timeout, ), flush_batch_clock: LogicalClock::with(self.server_config.flush_batch_tick_timeout), - seq_paxos: SequencePaxos::with(self.into(), storage), + seq_paxos: RwLock::new(SequencePaxos::with(self.into(), storage)), }) } } @@ -227,8 +231,8 @@ where T: Entry, B: Storage, { - seq_paxos: SequencePaxos, - ble: BallotLeaderElection, + seq_paxos: RwLock>, + ble: RwLock, election_clock: LogicalClock, resend_message_clock: LogicalClock, flush_batch_clock: LogicalClock, @@ -242,8 +246,8 @@ where /// Initiates the trim process. /// # Arguments /// * `trim_index` - Deletes all entries up to [`trim_index`], if the [`trim_index`] is `None` then the minimum index accepted by **ALL** servers will be used as the [`trim_index`]. - pub fn trim(&mut self, trim_index: Option) -> Result<(), CompactionErr> { - self.seq_paxos.trim(trim_index) + pub fn trim(&self, trim_index: Option) -> Result<(), CompactionErr> { + self.seq_paxos.write().unwrap().trim(trim_index) } /// Trim the log and create a snapshot. ** Note: only up to the `decided_idx` can be snapshotted ** @@ -251,21 +255,24 @@ where /// `compact_idx` - Snapshots all entries < [`compact_idx`], if the [`compact_idx`] is None then the decided index will be used. /// `local_only` - If `true`, only this server snapshots the log. If `false` all servers performs the snapshot. pub fn snapshot( - &mut self, + &self, compact_idx: Option, local_only: bool, ) -> Result<(), CompactionErr> { - self.seq_paxos.snapshot(compact_idx, local_only) + self.seq_paxos + .write() + .unwrap() + .snapshot(compact_idx, local_only) } /// Return the decided index. 0 means that no entry has been decided. pub fn get_decided_idx(&self) -> usize { - self.seq_paxos.get_decided_idx() + self.seq_paxos.read().unwrap().get_decided_idx() } /// Return trim index from storage. pub fn get_compacted_idx(&self) -> usize { - self.seq_paxos.get_compacted_idx() + self.seq_paxos.read().unwrap().get_compacted_idx() } /// Returns the ID of the current leader and whether the node's `Phase` is `Phase::Accepted`. @@ -275,30 +282,50 @@ where /// necessarily imply that the leader is not in the accepted phase; it only reflects the current /// phase of this node. pub fn get_current_leader(&self) -> Option<(NodeId, bool)> { - let promised_pid = self.seq_paxos.get_promise().pid; + let seq_paxos = self.seq_paxos.read().unwrap(); + let promised_pid = seq_paxos.get_promise().pid; if promised_pid == 0 { None } else { - let is_accepted = self.seq_paxos.get_state().1 == Phase::Accept; + let is_accepted = seq_paxos.get_state().1 == Phase::Accept; Some((promised_pid, is_accepted)) } } /// Returns the promised ballot of this node. pub fn get_promise(&self) -> Ballot { - self.seq_paxos.get_promise() + self.seq_paxos.read().unwrap().get_promise() } /// Moves outgoing messages from this server into the buffer. The messages should then be sent via the network implementation. - pub fn take_outgoing_messages(&mut self, buffer: &mut Vec>) { - self.seq_paxos.take_outgoing_msgs(buffer); - buffer.extend(self.ble.outgoing_mut().drain(..).map(|b| Message::BLE(b))); + pub fn take_outgoing_messages(&self, buffer: &mut Vec>) { + self.take_outgoing_messages_paxos(buffer); + self.take_outgoing_messages_ble(buffer); + } + + /// Drain pending outgoing sequence paxos messages into the provided buffer. + pub fn take_outgoing_messages_paxos(&self, buf: &mut Vec>) { + self.seq_paxos.write().unwrap().take_outgoing_msgs(buf); + } + + /// Drain pending outgoing BLE messages into the provided buffer. + pub fn take_outgoing_messages_ble(&self, buf: &mut Vec>) { + buf.extend( + self.ble + .write() + .unwrap() + .outgoing_mut() + .drain(..) + .map(Message::BLE), + ); } /// Read entry at index `idx` in the log. Returns `None` if `idx` is out of bounds. pub fn read(&self, idx: usize) -> Option> { match self .seq_paxos + .read() + .unwrap() .internal_storage .read(idx..idx + 1) .expect("storage error while trying to read log entries") @@ -314,6 +341,8 @@ where R: RangeBounds, { self.seq_paxos + .read() + .unwrap() .internal_storage .read(r) .expect("storage error while trying to read log entries") @@ -322,27 +351,34 @@ where /// Read all decided entries starting at `from_idx` (inclusive) in the log. Returns `None` if `from_idx` is out of bounds. pub fn read_decided_suffix(&self, from_idx: usize) -> Option>> { self.seq_paxos + .read() + .unwrap() .internal_storage .read_decided_suffix(from_idx) .expect("storage error while trying to read decided log suffix") } /// Handle an incoming message - pub fn handle_incoming(&mut self, m: Message) { + pub fn handle_incoming(&self, m: Message) { match m { - Message::SequencePaxos(p) => self.seq_paxos.handle(p), - Message::BLE(b) => self.ble.handle(b), + Message::SequencePaxos(p) => self.handle_incoming_paxos(p), + Message::BLE(b) => self.handle_incoming_ble(b), } } + /// Handle on incoming [PaxosMessage]. + pub fn handle_incoming_paxos(&self, m: PaxosMessage) { + self.seq_paxos.write().unwrap().handle(m); + } + /// Returns whether this Sequence Paxos has been reconfigured pub fn is_reconfigured(&self) -> Option { - self.seq_paxos.is_reconfigured() + self.seq_paxos.read().unwrap().is_reconfigured() } /// Append an entry to the replicated log. - pub fn append(&mut self, entry: T) -> Result<(), ProposeErr> { - self.seq_paxos.append(entry) + pub fn append(&self, entry: T) -> Result<(), ProposeErr> { + self.seq_paxos.write().unwrap().append(entry) } /// Propose a cluster reconfiguration. Returns an error if the current configuration has already been stopped @@ -350,7 +386,7 @@ where /// `new_configuration` defines the cluster-wide configuration settings for the **next** cluster. /// `metadata` is optional data to commit alongside the reconfiguration. pub fn reconfigure( - &mut self, + &self, new_configuration: ClusterConfig, metadata: Option>, ) -> Result<(), ProposeErr> { @@ -361,69 +397,94 @@ where metadata, )); } - self.seq_paxos.reconfigure(new_configuration, metadata) + self.seq_paxos + .write() + .unwrap() + .reconfigure(new_configuration, metadata) } /// Handles re-establishing a connection to a previously disconnected peer. /// This should only be called if the underlying network implementation indicates that a connection has been re-established. - pub fn reconnected(&mut self, pid: NodeId) { - self.seq_paxos.reconnected(pid) + pub fn reconnected(&self, pid: NodeId) { + self.seq_paxos.write().unwrap().reconnected(pid) } /// Increments the internal logical clock. This drives the processes for leader changes, resending dropped messages, and flushing batched log entries. /// Each of these is triggered every `election_tick_timeout`, `resend_message_tick_timeout`, and `flush_batch_tick_timeout` number of calls to this function /// (See how to configure these timeouts in `ServerConfig`). - pub fn tick(&mut self) { - if self.election_clock.tick_and_check_timeout() { - self.election_timeout(); - } + pub fn tick(&self) { + self.tick_ble(); + let mut seq_paxos = self.seq_paxos.write().unwrap(); if self.resend_message_clock.tick_and_check_timeout() { - self.seq_paxos.resend_message_timeout(); + seq_paxos.resend_message_timeout(); } if self.flush_batch_clock.tick_and_check_timeout() { - self.seq_paxos.flush_batch_timeout(); + seq_paxos.flush_batch_timeout(); } } /// Manually attempt to become the leader by incrementing this instance's Ballot. Calling this /// function may not result in gainig leadership if other instances are competing for /// leadership with higher Ballots. - pub fn try_become_leader(&mut self) { - let mut my_ballot = self.ble.get_current_ballot(); - let promise = self.seq_paxos.get_promise(); + pub fn try_become_leader(&self) { + let mut my_ballot = { + let ble = self.ble.read().unwrap(); + ble.get_current_ballot() + }; + let mut seq_paxos = self.seq_paxos.write().unwrap(); + let promise = seq_paxos.get_promise(); my_ballot.n = promise.n + 1; - self.seq_paxos.handle_leader(my_ballot); + seq_paxos.handle_leader(my_ballot); } /*** BLE calls ***/ /// Update the custom priority used in the Ballot for this server. Note that changing the /// priority triggers a leader re-election. - pub fn set_priority(&mut self, p: u32) { - self.ble.set_priority(p) + pub fn set_priority(&self, p: u32) { + self.ble.write().unwrap().set_priority(p) + } + + /// Handle an incoming [BLEMessage]. + pub fn handle_incoming_ble(&self, m: BLEMessage) { + self.ble.write().unwrap().handle(m); + } + + /// Tick only the ballot leader election. + pub fn tick_ble(&self) { + if self.election_clock.tick_and_check_timeout() { + self.election_timeout(); + } } /// If the heartbeat of a leader is not received when election_timeout() is called, the server might attempt to become the leader. /// It is also used for the election process, where the server checks if it can become the leader. /// For instance if `election_timeout()` is called every 100ms, then if the leader fails, the servers will detect it after 100ms and elect a new server after another 100ms if possible. - fn election_timeout(&mut self) { - if let Some(new_leader) = self - .ble - .hb_timeout(self.seq_paxos.get_state(), self.seq_paxos.get_promise()) - { - self.seq_paxos.handle_leader(new_leader); + fn election_timeout(&self) { + let maybe_new_leader = { + let mut ble = self.ble.write().unwrap(); + let seq_paxos = self.seq_paxos.read().unwrap(); + ble.hb_timeout(seq_paxos.get_state(), seq_paxos.get_promise()) + }; + if let Some(new_leader) = maybe_new_leader { + let mut seq_paxos = self.seq_paxos.write().unwrap(); + seq_paxos.handle_leader(new_leader); } } /// Returns the current states of the OmniPaxos instance for OmniPaxos UI to display. pub fn get_ui_states(&self) -> ui::OmniPaxosStates { - let mut cluster_state = ClusterState::from(self.seq_paxos.get_leader_state()); - cluster_state.heartbeats = self.ble.get_ballots(); + let mut cluster_state = { + let seq_paxos = self.seq_paxos.read().unwrap(); + ClusterState::from(seq_paxos.get_leader_state()) + }; + let ble = self.ble.read().unwrap(); + cluster_state.heartbeats = ble.get_ballots(); ui::OmniPaxosStates { - current_ballot: self.ble.get_current_ballot(), + current_ballot: ble.get_current_ballot(), current_leader: self.get_current_leader().map(|(leader, _)| leader), decided_idx: self.get_decided_idx(), - heartbeats: self.ble.get_ballots(), + heartbeats: ble.get_ballots(), cluster_state, } } diff --git a/omnipaxos/src/util.rs b/omnipaxos/src/util.rs index beedefcd..332db1a3 100644 --- a/omnipaxos/src/util.rs +++ b/omnipaxos/src/util.rs @@ -6,7 +6,7 @@ use super::{ use nohash_hasher::IntMap; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; -use std::{cmp::Ordering, marker::PhantomData}; +use std::{cmp::Ordering, marker::PhantomData, sync::atomic::AtomicU64}; /// Struct used to help another server synchronize their log with the current state of our own log. #[derive(Clone, Debug)] @@ -400,19 +400,22 @@ impl SequenceNumber { } pub(crate) struct LogicalClock { - time: u64, + time: AtomicU64, timeout: u64, } impl LogicalClock { pub fn with(timeout: u64) -> Self { - Self { time: 0, timeout } + Self { + time: AtomicU64::new(0), + timeout, + } } - pub fn tick_and_check_timeout(&mut self) -> bool { - self.time += 1; - if self.time == self.timeout { - self.time = 0; + pub fn tick_and_check_timeout(&self) -> bool { + let t = self.time.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + if t + 1 == self.timeout { + self.time.store(0, std::sync::atomic::Ordering::SeqCst); true } else { false diff --git a/omnipaxos/tests/atomic_storage_test.rs b/omnipaxos/tests/atomic_storage_test.rs index d72fdb6b..8f7e99df 100644 --- a/omnipaxos/tests/atomic_storage_test.rs +++ b/omnipaxos/tests/atomic_storage_test.rs @@ -72,7 +72,7 @@ fn _setup_leader() -> ( BrokenStore, OmniPaxos>, ) { - let (mem_storage, storage_conf, mut op) = setup_follower(); + let (mem_storage, storage_conf, op) = setup_follower(); let mut n = mem_storage.lock().unwrap().get_promise().unwrap().unwrap(); let n_old = n; let setup_msg = Message::::BLE(BLEMessage { @@ -148,7 +148,7 @@ fn setup_follower() -> ( BrokenStore, OmniPaxos>, ) { - let (mem_storage, storage_conf, mut op) = basic_setup(); + let (mem_storage, storage_conf, op) = basic_setup(); let mut n = mem_storage.lock().unwrap().get_promise().unwrap().unwrap(); n.config_id = 1; n.n += 1; @@ -204,7 +204,7 @@ fn setup_follower() -> ( #[serial] fn atomic_storage_acceptsync_test() { fn run_single_test(fail_after_n_ops: usize) { - let (mem_storage, storage_conf, mut op) = basic_setup(); + let (mem_storage, storage_conf, op) = basic_setup(); let mut n = mem_storage.lock().unwrap().get_promise().unwrap().unwrap(); n.n += 1; n.pid = 2; @@ -271,7 +271,7 @@ fn atomic_storage_acceptsync_test() { #[serial] fn atomic_storage_trim_test() { fn run_single_test(fail_after_n_ops: usize) { - let (mem_storage, storage_conf, mut op) = setup_follower(); + let (mem_storage, storage_conf, op) = setup_follower(); let setup_msg = Message::::SequencePaxos(PaxosMessage { from: 2, @@ -335,7 +335,7 @@ fn atomic_storage_trim_test() { #[serial] fn atomic_storage_snapshot_test() { fn run_single_test(fail_after_n_ops: usize) { - let (mem_storage, storage_conf, mut op) = setup_follower(); + let (mem_storage, storage_conf, op) = setup_follower(); let setup_msg = Message::::SequencePaxos(PaxosMessage { from: 2, @@ -405,7 +405,7 @@ fn atomic_storage_snapshot_test() { #[serial] fn atomic_storage_accept_decide_test() { fn run_single_test(fail_after_n_ops: usize) { - let (mem_storage, storage_conf, mut op) = setup_follower(); + let (mem_storage, storage_conf, op) = setup_follower(); let old_log_len = mem_storage.lock().unwrap().get_log_len().unwrap(); let old_decided_idx = mem_storage.lock().unwrap().get_decided_idx().unwrap(); @@ -458,7 +458,7 @@ fn atomic_storage_accept_decide_test() { #[serial] fn atomic_storage_majority_promises_test() { fn run_single_test(fail_after_n_ops: usize) { - let (mem_storage, storage_conf, mut op) = setup_follower(); + let (mem_storage, storage_conf, op) = setup_follower(); let mut n = mem_storage.lock().unwrap().get_promise().unwrap().unwrap(); // Send messages to 1 such that it tries to take over leadership let n_old = n; diff --git a/omnipaxos/tests/consensus_test.rs b/omnipaxos/tests/consensus_test.rs index f7d2a395..ce31ae82 100644 --- a/omnipaxos/tests/consensus_test.rs +++ b/omnipaxos/tests/consensus_test.rs @@ -87,7 +87,7 @@ fn read_test() { op_config.cluster_config.nodes = vec![1, 2, 3]; op_config.cluster_config.configuration_id = 1; - let mut omni_paxos = op_config.clone().build(storage).unwrap(); + let omni_paxos = op_config.clone().build(storage).unwrap(); // read decided entries let entries = omni_paxos @@ -137,7 +137,7 @@ fn read_test() { .set_decided_idx(log_len + 1) .expect("Failed to set decided index"); - let mut stopped_op = op_config.build(stopped_storage).unwrap(); + let stopped_op = op_config.build(stopped_storage).unwrap(); stopped_op .snapshot(Some(snapshotted_idx), true) .expect("Failed to snapshot"); @@ -175,7 +175,7 @@ fn read_entries_test() { op_config.cluster_config.nodes = vec![1, 2, 3]; op_config.cluster_config.configuration_id = 1; - let mut omni_paxos = op_config.clone().build(storage).unwrap(); + let omni_paxos = op_config.clone().build(storage).unwrap(); omni_paxos .snapshot(Some(snapshotted_idx), true) .expect("Failed to snapshot"); @@ -228,7 +228,7 @@ fn read_entries_test() { stopped_storage.set_stopsign(Some(ss.clone())).unwrap(); stopped_storage.set_decided_idx(log_len + 1).unwrap(); - let mut stopped_op = op_config.build(stopped_storage).unwrap(); + let stopped_op = op_config.build(stopped_storage).unwrap(); stopped_op .snapshot(Some(snapshotted_idx), true) .expect("Failed to snapshot"); diff --git a/omnipaxos_storage/src/persistent_storage.rs b/omnipaxos_storage/src/persistent_storage.rs index d49a8f6e..8024de0a 100644 --- a/omnipaxos_storage/src/persistent_storage.rs +++ b/omnipaxos_storage/src/persistent_storage.rs @@ -178,7 +178,7 @@ where } /// Get handle to the log column family of the database - fn get_log_handle(&self) -> ColumnFamilyRef { + fn get_log_handle(&self) -> ColumnFamilyRef<'_> { self.db .cf_handle(LOG) .expect("Couldn't find RocksDB log column family") diff --git a/omnipaxos_ui/src/render.rs b/omnipaxos_ui/src/render.rs index d900a4eb..7aac6e2a 100644 --- a/omnipaxos_ui/src/render.rs +++ b/omnipaxos_ui/src/render.rs @@ -152,7 +152,7 @@ fn draw_title<'a>(app: &App) -> Paragraph<'a> { ) } -fn draw_chart(app: &App, window_width: usize) -> BarChart { +fn draw_chart(app: &App, window_width: usize) -> BarChart<'_> { let data: &Vec<(&str, u64)> = &app .throughput_data .iter()