Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 111 additions & 50 deletions omnipaxos/src/omni_paxos.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use crate::{
ballot_leader_election::{Ballot, BallotLeaderElection},
errors::{valid_config, ConfigError},
messages::Message,
messages::{ballot_leader_election::BLEMessage, sequence_paxos::PaxosMessage, Message},
sequence_paxos::{Phase, SequencePaxos},
storage::{Entry, StopSign, Storage},
util::{
defaults::{BUFFER_SIZE, ELECTION_TIMEOUT, FLUSH_BATCH_TIMEOUT, RESEND_MESSAGE_TIMEOUT},
ConfigurationId, FlexibleQuorum, LogEntry, LogicalClock, NodeId,
},
utils::{ui, ui::ClusterState},
utils::ui::{self, ClusterState},
};
#[cfg(any(feature = "toml_config", feature = "serde"))]
use serde::Deserialize;
Expand All @@ -20,6 +20,7 @@ use std::{
error::Error,
fmt::{Debug, Display},
ops::RangeBounds,
sync::RwLock,
};
#[cfg(feature = "toml_config")]
use toml;
Expand Down Expand Up @@ -69,13 +70,16 @@ impl OmniPaxosConfig {
.get_promise()
.expect("storage error while trying to read promise");
Ok(OmniPaxos {
ble: BallotLeaderElection::with(self.clone().into(), recovered_leader),
ble: RwLock::new(BallotLeaderElection::with(
self.clone().into(),
recovered_leader,
)),
election_clock: LogicalClock::with(self.server_config.election_tick_timeout),
resend_message_clock: LogicalClock::with(
self.server_config.resend_message_tick_timeout,
),
flush_batch_clock: LogicalClock::with(self.server_config.flush_batch_tick_timeout),
seq_paxos: SequencePaxos::with(self.into(), storage),
seq_paxos: RwLock::new(SequencePaxos::with(self.into(), storage)),
})
}
}
Expand Down Expand Up @@ -227,8 +231,8 @@ where
T: Entry,
B: Storage<T>,
{
seq_paxos: SequencePaxos<T, B>,
ble: BallotLeaderElection,
seq_paxos: RwLock<SequencePaxos<T, B>>,
ble: RwLock<BallotLeaderElection>,
election_clock: LogicalClock,
resend_message_clock: LogicalClock,
flush_batch_clock: LogicalClock,
Expand All @@ -242,30 +246,33 @@ where
/// Initiates the trim process.
/// # Arguments
/// * `trim_index` - Deletes all entries up to [`trim_index`], if the [`trim_index`] is `None` then the minimum index accepted by **ALL** servers will be used as the [`trim_index`].
pub fn trim(&mut self, trim_index: Option<usize>) -> Result<(), CompactionErr> {
self.seq_paxos.trim(trim_index)
pub fn trim(&self, trim_index: Option<usize>) -> Result<(), CompactionErr> {
self.seq_paxos.write().unwrap().trim(trim_index)
}

/// Trim the log and create a snapshot. ** Note: only up to the `decided_idx` can be snapshotted **
/// # Arguments
/// `compact_idx` - Snapshots all entries < [`compact_idx`], if the [`compact_idx`] is None then the decided index will be used.
/// `local_only` - If `true`, only this server snapshots the log. If `false` all servers performs the snapshot.
pub fn snapshot(
&mut self,
&self,
compact_idx: Option<usize>,
local_only: bool,
) -> Result<(), CompactionErr> {
self.seq_paxos.snapshot(compact_idx, local_only)
self.seq_paxos
.write()
.unwrap()
.snapshot(compact_idx, local_only)
}

/// Return the decided index. 0 means that no entry has been decided.
pub fn get_decided_idx(&self) -> usize {
self.seq_paxos.get_decided_idx()
self.seq_paxos.read().unwrap().get_decided_idx()
}

/// Return trim index from storage.
pub fn get_compacted_idx(&self) -> usize {
self.seq_paxos.get_compacted_idx()
self.seq_paxos.read().unwrap().get_compacted_idx()
}

/// Returns the ID of the current leader and whether the node's `Phase` is `Phase::Accepted`.
Expand All @@ -275,30 +282,50 @@ where
/// necessarily imply that the leader is not in the accepted phase; it only reflects the current
/// phase of this node.
pub fn get_current_leader(&self) -> Option<(NodeId, bool)> {
let promised_pid = self.seq_paxos.get_promise().pid;
let seq_paxos = self.seq_paxos.read().unwrap();
let promised_pid = seq_paxos.get_promise().pid;
if promised_pid == 0 {
None
} else {
let is_accepted = self.seq_paxos.get_state().1 == Phase::Accept;
let is_accepted = seq_paxos.get_state().1 == Phase::Accept;
Some((promised_pid, is_accepted))
}
}

/// Returns the promised ballot of this node.
pub fn get_promise(&self) -> Ballot {
self.seq_paxos.get_promise()
self.seq_paxos.read().unwrap().get_promise()
}

/// Moves outgoing messages from this server into the buffer. The messages should then be sent via the network implementation.
pub fn take_outgoing_messages(&mut self, buffer: &mut Vec<Message<T>>) {
self.seq_paxos.take_outgoing_msgs(buffer);
buffer.extend(self.ble.outgoing_mut().drain(..).map(|b| Message::BLE(b)));
pub fn take_outgoing_messages(&self, buffer: &mut Vec<Message<T>>) {
self.take_outgoing_messages_paxos(buffer);
self.take_outgoing_messages_ble(buffer);
}

/// Drain pending outgoing sequence paxos messages into the provided buffer.
pub fn take_outgoing_messages_paxos(&self, buf: &mut Vec<Message<T>>) {
self.seq_paxos.write().unwrap().take_outgoing_msgs(buf);
}

/// Drain pending outgoing BLE messages into the provided buffer.
pub fn take_outgoing_messages_ble(&self, buf: &mut Vec<Message<T>>) {
buf.extend(
self.ble
.write()
.unwrap()
.outgoing_mut()
.drain(..)
.map(Message::BLE),
);
}

/// Read entry at index `idx` in the log. Returns `None` if `idx` is out of bounds.
pub fn read(&self, idx: usize) -> Option<LogEntry<T>> {
match self
.seq_paxos
.read()
.unwrap()
.internal_storage
.read(idx..idx + 1)
.expect("storage error while trying to read log entries")
Expand All @@ -314,6 +341,8 @@ where
R: RangeBounds<usize>,
{
self.seq_paxos
.read()
.unwrap()
.internal_storage
.read(r)
.expect("storage error while trying to read log entries")
Expand All @@ -322,35 +351,42 @@ where
/// Read all decided entries starting at `from_idx` (inclusive) in the log. Returns `None` if `from_idx` is out of bounds.
pub fn read_decided_suffix(&self, from_idx: usize) -> Option<Vec<LogEntry<T>>> {
self.seq_paxos
.read()
.unwrap()
.internal_storage
.read_decided_suffix(from_idx)
.expect("storage error while trying to read decided log suffix")
}

/// Handle an incoming message
pub fn handle_incoming(&mut self, m: Message<T>) {
pub fn handle_incoming(&self, m: Message<T>) {
match m {
Message::SequencePaxos(p) => self.seq_paxos.handle(p),
Message::BLE(b) => self.ble.handle(b),
Message::SequencePaxos(p) => self.handle_incoming_paxos(p),
Message::BLE(b) => self.handle_incoming_ble(b),
}
}

/// Handle on incoming [PaxosMessage].
pub fn handle_incoming_paxos(&self, m: PaxosMessage<T>) {
self.seq_paxos.write().unwrap().handle(m);
}

/// Returns whether this Sequence Paxos has been reconfigured
pub fn is_reconfigured(&self) -> Option<StopSign> {
self.seq_paxos.is_reconfigured()
self.seq_paxos.read().unwrap().is_reconfigured()
}

/// Append an entry to the replicated log.
pub fn append(&mut self, entry: T) -> Result<(), ProposeErr<T>> {
self.seq_paxos.append(entry)
pub fn append(&self, entry: T) -> Result<(), ProposeErr<T>> {
self.seq_paxos.write().unwrap().append(entry)
}

/// Propose a cluster reconfiguration. Returns an error if the current configuration has already been stopped
/// by a previous reconfiguration request or if the `new_configuration` is invalid.
/// `new_configuration` defines the cluster-wide configuration settings for the **next** cluster.
/// `metadata` is optional data to commit alongside the reconfiguration.
pub fn reconfigure(
&mut self,
&self,
new_configuration: ClusterConfig,
metadata: Option<Vec<u8>>,
) -> Result<(), ProposeErr<T>> {
Expand All @@ -361,69 +397,94 @@ where
metadata,
));
}
self.seq_paxos.reconfigure(new_configuration, metadata)
self.seq_paxos
.write()
.unwrap()
.reconfigure(new_configuration, metadata)
}

/// Handles re-establishing a connection to a previously disconnected peer.
/// This should only be called if the underlying network implementation indicates that a connection has been re-established.
pub fn reconnected(&mut self, pid: NodeId) {
self.seq_paxos.reconnected(pid)
pub fn reconnected(&self, pid: NodeId) {
self.seq_paxos.write().unwrap().reconnected(pid)
}

/// Increments the internal logical clock. This drives the processes for leader changes, resending dropped messages, and flushing batched log entries.
/// Each of these is triggered every `election_tick_timeout`, `resend_message_tick_timeout`, and `flush_batch_tick_timeout` number of calls to this function
/// (See how to configure these timeouts in `ServerConfig`).
pub fn tick(&mut self) {
if self.election_clock.tick_and_check_timeout() {
self.election_timeout();
}
pub fn tick(&self) {
self.tick_ble();
let mut seq_paxos = self.seq_paxos.write().unwrap();
if self.resend_message_clock.tick_and_check_timeout() {
self.seq_paxos.resend_message_timeout();
seq_paxos.resend_message_timeout();
}
if self.flush_batch_clock.tick_and_check_timeout() {
self.seq_paxos.flush_batch_timeout();
seq_paxos.flush_batch_timeout();
}
}

/// Manually attempt to become the leader by incrementing this instance's Ballot. Calling this
/// function may not result in gainig leadership if other instances are competing for
/// leadership with higher Ballots.
pub fn try_become_leader(&mut self) {
let mut my_ballot = self.ble.get_current_ballot();
let promise = self.seq_paxos.get_promise();
pub fn try_become_leader(&self) {
let mut my_ballot = {
let ble = self.ble.read().unwrap();
ble.get_current_ballot()
};
let mut seq_paxos = self.seq_paxos.write().unwrap();
let promise = seq_paxos.get_promise();
my_ballot.n = promise.n + 1;
self.seq_paxos.handle_leader(my_ballot);
seq_paxos.handle_leader(my_ballot);
}

/*** BLE calls ***/
/// Update the custom priority used in the Ballot for this server. Note that changing the
/// priority triggers a leader re-election.
pub fn set_priority(&mut self, p: u32) {
self.ble.set_priority(p)
pub fn set_priority(&self, p: u32) {
self.ble.write().unwrap().set_priority(p)
}

/// Handle an incoming [BLEMessage].
pub fn handle_incoming_ble(&self, m: BLEMessage) {
self.ble.write().unwrap().handle(m);
}

/// Tick only the ballot leader election.
pub fn tick_ble(&self) {
if self.election_clock.tick_and_check_timeout() {
self.election_timeout();
}
}

/// If the heartbeat of a leader is not received when election_timeout() is called, the server might attempt to become the leader.
/// It is also used for the election process, where the server checks if it can become the leader.
/// For instance if `election_timeout()` is called every 100ms, then if the leader fails, the servers will detect it after 100ms and elect a new server after another 100ms if possible.
fn election_timeout(&mut self) {
if let Some(new_leader) = self
.ble
.hb_timeout(self.seq_paxos.get_state(), self.seq_paxos.get_promise())
{
self.seq_paxos.handle_leader(new_leader);
fn election_timeout(&self) {
let maybe_new_leader = {
let mut ble = self.ble.write().unwrap();
let seq_paxos = self.seq_paxos.read().unwrap();
ble.hb_timeout(seq_paxos.get_state(), seq_paxos.get_promise())
};
if let Some(new_leader) = maybe_new_leader {
let mut seq_paxos = self.seq_paxos.write().unwrap();
seq_paxos.handle_leader(new_leader);
}
}

/// Returns the current states of the OmniPaxos instance for OmniPaxos UI to display.
pub fn get_ui_states(&self) -> ui::OmniPaxosStates {
let mut cluster_state = ClusterState::from(self.seq_paxos.get_leader_state());
cluster_state.heartbeats = self.ble.get_ballots();
let mut cluster_state = {
let seq_paxos = self.seq_paxos.read().unwrap();
ClusterState::from(seq_paxos.get_leader_state())
};
let ble = self.ble.read().unwrap();
cluster_state.heartbeats = ble.get_ballots();

ui::OmniPaxosStates {
current_ballot: self.ble.get_current_ballot(),
current_ballot: ble.get_current_ballot(),
current_leader: self.get_current_leader().map(|(leader, _)| leader),
decided_idx: self.get_decided_idx(),
heartbeats: self.ble.get_ballots(),
heartbeats: ble.get_ballots(),
cluster_state,
}
}
Expand Down
17 changes: 10 additions & 7 deletions omnipaxos/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use super::{
use nohash_hasher::IntMap;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
use std::{cmp::Ordering, marker::PhantomData};
use std::{cmp::Ordering, marker::PhantomData, sync::atomic::AtomicU64};

/// Struct used to help another server synchronize their log with the current state of our own log.
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -400,19 +400,22 @@ impl SequenceNumber {
}

pub(crate) struct LogicalClock {
time: u64,
time: AtomicU64,
timeout: u64,
}

impl LogicalClock {
pub fn with(timeout: u64) -> Self {
Self { time: 0, timeout }
Self {
time: AtomicU64::new(0),
timeout,
}
}

pub fn tick_and_check_timeout(&mut self) -> bool {
self.time += 1;
if self.time == self.timeout {
self.time = 0;
pub fn tick_and_check_timeout(&self) -> bool {
let t = self.time.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
if t + 1 == self.timeout {
self.time.store(0, std::sync::atomic::Ordering::SeqCst);
true
} else {
false
Expand Down
Loading