diff --git a/.gitignore b/.gitignore index 4ad91fc..951054a 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,5 @@ **.log tmp/ .vscode -db/ \ No newline at end of file +db/ +archives \ No newline at end of file diff --git a/tinydancer/src/consensus.rs b/tinydancer/src/consensus.rs new file mode 100644 index 0000000..5767c4e --- /dev/null +++ b/tinydancer/src/consensus.rs @@ -0,0 +1,244 @@ +use crate::sampler::{ArchiveConfig, SlotSubscribeResponse}; +use crate::tinydancer::{endpoint, ClientService, ClientStatus, Cluster}; +use crate::{convert_to_websocket, send_rpc_call, try_coerce_shred}; +use anyhow::anyhow; +use async_trait::async_trait; +use crossbeam::channel::{Receiver, Sender}; +use futures::Sink; +use itertools::Itertools; +use rand::distributions::Uniform; +use rand::prelude::*; +use rayon::prelude::*; +use reqwest::Request; +use rocksdb::{ColumnFamily, Options as RocksOptions, DB}; +use serde::de::DeserializeOwned; +use serde_derive::Deserialize; +use serde_derive::Serialize; +use solana_ledger::shred::{ShredId, ShredType}; +use solana_ledger::{ + ancestor_iterator::{AncestorIterator, AncestorIteratorWithHash}, + blockstore::Blockstore, + // blockstore_db::columns::ShredCode, + shred::{Nonce, Shred, ShredCode, ShredData, ShredFetchStats, SIZE_OF_NONCE}, +}; +use solana_sdk::hash::hashv; +use solana_sdk::{ + clock::Slot, + genesis_config::ClusterType, + hash::{Hash, HASH_BYTES}, + packet::PACKET_DATA_SIZE, + pubkey::{Pubkey, PUBKEY_BYTES}, + signature::{Signable, Signature, Signer, SIGNATURE_BYTES}, + signer::keypair::Keypair, + timing::{duration_as_ms, timestamp}, +}; +use std::str::FromStr; +use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::Arc; +use std::{error::Error, ops::Add}; +use std::{ + net::{SocketAddr, UdpSocket}, + thread::Builder, +}; +use tiny_logger::logs::{debug, error, info}; +use tokio::{ + sync::mpsc::UnboundedSender, + sync::{Mutex, MutexGuard}, + task::{JoinError, JoinHandle}, +}; +use tungstenite::{connect, Message}; +use url::Url; + +pub struct ConsensusService { + consensus_indices: Vec, + consensus_handler: JoinHandle<()>, +} + +pub struct ConsensusServiceConfig { + pub cluster: Cluster, + pub archive_config: ArchiveConfig, + pub instance: Arc, + pub client_status: Arc>, + pub sample_qty: usize, +} + +#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct RpcBlockCommitment { + pub commitment: Option, + pub total_stake: u64, +} + +#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct GetCommittmentResponse { + pub jsonrpc: String, + pub result: RpcBlockCommitment, + pub id: i64, +} + +pub const MAX_LOCKOUT_HISTORY: usize = 31; +pub type BlockCommitmentArray = [u64; MAX_LOCKOUT_HISTORY + 1]; + +pub const VOTE_THRESHOLD_SIZE: f64 = 2f64 / 3f64; + +#[async_trait] +impl ClientService for ConsensusService { + type ServiceError = tokio::task::JoinError; + + fn new(config: ConsensusServiceConfig) -> Self { + let consensus_handler = tokio::spawn(async move { + let rpc_url = endpoint(config.cluster); + let pub_sub = convert_to_websocket!(rpc_url); + + let mut threads = Vec::default(); + + let (slot_update_tx, slot_update_rx) = crossbeam::channel::unbounded::(); + + let status_arc = config.client_status.clone(); + + // waits on new slots => triggers slot_verify_loop + threads.push(tokio::spawn(slot_update_loop( + slot_update_tx, + pub_sub, + config.client_status, + ))); + + // verify slot votes + threads.push(tokio::spawn(slot_verify_loop( + slot_update_rx, + rpc_url, + status_arc, + ))); + + for thread in threads { + thread.await; + } + }); + + Self { + consensus_handler, + consensus_indices: Vec::default(), + } + } + + async fn join(self) -> std::result::Result<(), Self::ServiceError> { + self.consensus_handler.await + } +} + +pub async fn slot_update_loop( + slot_update_tx: Sender, + pub_sub: String, + client_status: Arc>, +) -> anyhow::Result<()> { + let result = match connect(Url::parse(pub_sub.as_str()).unwrap()) { + Ok((socket, _response)) => Some((socket, _response)), + Err(_) => { + let mut status = client_status.lock().await; + *status = ClientStatus::Crashed(String::from("Client can't connect to socket")); + None + } + }; + + if result.is_none() { + return Err(anyhow!("")); + } + + let (mut socket, _response) = result.unwrap(); + + socket.write_message(Message::Text( + r#"{ "jsonrpc": "2.0", "id": 1, "method": "slotSubscribe" }"#.into(), + ))?; + + loop { + match socket.read_message() { + Ok(msg) => { + let res = serde_json::from_str::(msg.to_string().as_str()); + + // info!("res: {:?}", msg.to_string().as_str()); + if let Ok(res) = res { + match slot_update_tx.send(res.params.result.root as u64) { + Ok(_) => { + info!("slot updated: {:?}", res.params.result.root); + } + Err(e) => { + info!("error here: {:?} {:?}", e, res.params.result.root as u64); + continue; // @TODO: we should add retries here incase send fails for some reason + } + } + } + } + Err(e) => info!("err: {:?}", e), + } + } +} + +// verifies the total vote on the slot > 2/3 +fn verify_slot(slot_commitment: RpcBlockCommitment) -> bool { + let commitment_array = &slot_commitment.commitment; + let total_stake = &slot_commitment.total_stake; + let sum: u64 = commitment_array.iter().flatten().sum(); + + if (sum as f64 / *total_stake as f64) > VOTE_THRESHOLD_SIZE { + true + } else { + false + } +} + +pub async fn slot_verify_loop( + slot_update_rx: Receiver, + endpoint: String, + client_status: Arc>, +) -> anyhow::Result<()> { + loop { + let mut status = client_status.lock().await; + if let ClientStatus::Crashed(_) = &*status { + return Err(anyhow!("Client crashed")); + } else { + *status = + ClientStatus::Active(String::from("Monitoring Tinydancer: Verifying consensus")); + } + drop(status); + if let Ok(slot) = slot_update_rx.recv() { + let slot_commitment_result = request_slot_voting(slot, &endpoint).await; + + if let Err(e) = slot_commitment_result { + println!("Error {}", e); + info!("{}", e); + continue; + } + + let slot_commitment = slot_commitment_result.unwrap(); + + let verified = verify_slot(slot_commitment.result); + + if verified { + info!("slot {:?} verified ", slot); + } else { + info!("slot {:?} failed to verified ", slot); + info!("sample INVALID for slot : {:?}", slot); + } + } + } +} + +pub async fn request_slot_voting( + slot: u64, + endpoint: &String, +) -> Result { + let request = serde_json::json!({ + "jsonrpc": "2.0", + "id": 1, + "method": "getBlockCommitment", + "params": [ + slot + ] + }) + .to_string(); + + let res = send_rpc_call!(endpoint, request); + + serde_json::from_str::(&res) +} diff --git a/tinydancer/src/macros.rs b/tinydancer/src/macros.rs index c747f67..7c0f646 100644 --- a/tinydancer/src/macros.rs +++ b/tinydancer/src/macros.rs @@ -17,6 +17,17 @@ macro_rules! block_on { rt.handle().block_on($func).expect($error); }; } +#[macro_export] +macro_rules! block_on_async { + ($func:expr) => {{ + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + rt.block_on($func) + }}; +} + #[macro_export] macro_rules! try_coerce_shred { ($response:expr) => {{ diff --git a/tinydancer/src/main.rs b/tinydancer/src/main.rs index 9d1c522..325c710 100644 --- a/tinydancer/src/main.rs +++ b/tinydancer/src/main.rs @@ -46,6 +46,7 @@ use std::{ use tinydancer::{endpoint, Cluster, TinyDancer, TinyDancerConfig}; mod macros; use colored::Colorize; +mod consensus; mod rpc_wrapper; mod sampler; mod ui; @@ -85,6 +86,10 @@ pub enum Commands { /// Duration after which shreds will be purged #[clap(required = false, default_value_t = 10000000)] shred_archive_duration: u64, + + /// Run the node in consensus mode + #[clap(long, short)] + consensus_mode: bool, }, /// Verify the samples for a single slot Verify { @@ -144,6 +149,7 @@ async fn main() -> Result<()> { archive_path, shred_archive_duration, tui_monitor, + consensus_mode, } => { let config_file = get_config_file().map_err(|_| anyhow!("tinydancer config not set"))?; @@ -152,6 +158,7 @@ async fn main() -> Result<()> { rpc_endpoint: get_cluster(config_file.cluster), sample_qty, tui_monitor, + consensus_mode, log_path: config_file.log_path, archive_config: { archive_path @@ -227,8 +234,6 @@ async fn main() -> Result<()> { } } ConfigSubcommands::Set { log_path, cluster } => { - // println!("{:?}", fs::create_dir_all("~/.config/tinydancer")); - let home_path = std::env::var("HOME").unwrap(); let tinydancer_dir = home_path + "/.config/tinydancer"; diff --git a/tinydancer/src/sampler.rs b/tinydancer/src/sampler.rs index 4d7448a..daa1b77 100644 --- a/tinydancer/src/sampler.rs +++ b/tinydancer/src/sampler.rs @@ -31,7 +31,7 @@ use solana_sdk::{ }; use std::str::FromStr; use std::sync::atomic::{AtomicU32, Ordering}; -use std::sync::{Arc, Mutex, MutexGuard}; +use std::sync::Arc; use std::{error::Error, ops::Add}; use std::{ net::{SocketAddr, UdpSocket}, @@ -40,6 +40,7 @@ use std::{ use tiny_logger::logs::{debug, error, info}; use tokio::{ sync::mpsc::UnboundedSender, + sync::{Mutex, MutexGuard}, task::{JoinError, JoinHandle}, }; use tungstenite::{connect, Message}; @@ -56,7 +57,7 @@ pub struct SampleServiceConfig { pub cluster: Cluster, pub archive_config: ArchiveConfig, pub instance: Arc, - pub status_sampler: Arc>, + pub client_status: Arc>, pub sample_qty: usize, } @@ -81,13 +82,13 @@ impl ClientService for SampleService { let (shred_tx, shred_rx) = crossbeam::channel::unbounded(); let (verified_shred_tx, verified_shred_rx) = crossbeam::channel::unbounded(); - let status_arc = config.status_sampler.clone(); + let status_arc = config.client_status.clone(); // waits on new slots => triggers shred_update_loop threads.push(tokio::spawn(slot_update_loop( slot_update_tx, pub_sub, - config.status_sampler, + config.client_status, ))); // sample shreds from new slot @@ -155,16 +156,17 @@ pub async fn request_shreds( serde_json::from_str::(&res) } -async fn slot_update_loop( +pub async fn slot_update_loop( slot_update_tx: Sender, pub_sub: String, - status_sampler: Arc>, + client_status: Arc>, ) -> anyhow::Result<()> { let result = match connect(Url::parse(pub_sub.as_str()).unwrap()) { Ok((socket, _response)) => Some((socket, _response)), Err(_) => { - let mut status = status_sampler.lock().unwrap(); + let mut status = client_status.lock().await; *status = ClientStatus::Crashed(String::from("Client can't connect to socket")); + drop(status); None } }; @@ -318,18 +320,19 @@ async fn shred_update_loop( slot_update_rx: Receiver, endpoint: String, shred_tx: Sender<(Vec>, solana_ledger::shred::Pubkey)>, - status_sampler: Arc>, + client_status: Arc>, sample_qty: usize, ) -> anyhow::Result<()> { loop { { - let mut status = status_sampler.lock().unwrap(); + let mut status = client_status.lock().await; if let ClientStatus::Crashed(_) = &*status { return Err(anyhow!("Client crashed")); } else { *status = ClientStatus::Active(String::from( "Monitoring Tinydancer: Actively Sampling Shreds", )); + drop(status) } } @@ -535,6 +538,7 @@ pub struct GetShredResponse { pub result: GetShredResult, pub id: i64, } + #[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct GetShredResult { diff --git a/tinydancer/src/tinydancer.rs b/tinydancer/src/tinydancer.rs index 09cbcb3..e08f89b 100644 --- a/tinydancer/src/tinydancer.rs +++ b/tinydancer/src/tinydancer.rs @@ -1,15 +1,12 @@ //! Sampler struct - incharge of sampling shreds // use rayon::prelude::*; -use std::{ - env, - sync::{Arc, Mutex, MutexGuard}, - thread::Result, -}; +use std::{env, sync::Arc, thread::Result}; // use tokio::time::Duration; use crate::{ block_on, + consensus::{ConsensusService, ConsensusServiceConfig}, rpc_wrapper::{TransactionService, TransactionServiceConfig}, sampler::{ArchiveConfig, SampleService, SampleServiceConfig, SHRED_CF}, ui::{UiConfig, UiService}, @@ -23,7 +20,12 @@ use tiny_logger::logs::info; // use log::info; // use log4rs; use std::error::Error; -use tokio::{runtime::Runtime, task::JoinError, try_join}; +use tokio::{ + runtime::Runtime, + sync::{Mutex, MutexGuard}, + task::JoinError, + try_join, +}; // use std::{thread, thread::JoinHandle, time::Duration}; #[async_trait] @@ -49,6 +51,7 @@ pub struct TinyDancerConfig { pub enable_ui_service: bool, pub archive_config: ArchiveConfig, pub tui_monitor: bool, + pub consensus_mode: bool, pub log_path: String, } @@ -62,10 +65,8 @@ use std::path::PathBuf; impl TinyDancer { pub async fn start(config: TinyDancerConfig) -> Result<()> { let status = ClientStatus::Initializing(String::from("Starting Up Tinydancer")); - let client_status = Arc::new(Mutex::new(status)); - let status_sampler = client_status.clone(); - + let client_status_ui = client_status.clone(); let TinyDancerConfig { enable_ui_service, rpc_endpoint, @@ -73,9 +74,10 @@ impl TinyDancer { tui_monitor, log_path, archive_config, + consensus_mode, } = config.clone(); std::env::set_var("RUST_LOG", "info"); - tiny_logger::setup_file_with_default(&log_path, "RUST_LOG"); + // tiny_logger::setup_file_with_default(&log_path, "RUST_LOG"); let mut opts = rocksdb::Options::default(); opts.create_if_missing(true); @@ -87,15 +89,6 @@ impl TinyDancer { .unwrap(); let db = Arc::new(db); - let sample_service_config = SampleServiceConfig { - cluster: rpc_endpoint.clone(), - archive_config, - instance: db.clone(), - status_sampler, - sample_qty, - }; - let sample_service = SampleService::new(sample_service_config); - let transaction_service = TransactionService::new(TransactionServiceConfig { cluster: rpc_endpoint.clone(), db_instance: db.clone(), @@ -103,20 +96,46 @@ impl TinyDancer { let ui_service = if enable_ui_service || tui_monitor { Some(UiService::new(UiConfig { - client_status, + client_status: client_status_ui, enable_ui_service, tui_monitor, })) } else { None }; + // run the sampling service + if !consensus_mode { + let sample_service_config = SampleServiceConfig { + cluster: rpc_endpoint.clone(), + archive_config: archive_config.clone(), + instance: db.clone(), + client_status: client_status.clone(), + sample_qty, + }; + + let sample_service = SampleService::new(sample_service_config); + sample_service + .join() + .await + .expect("error in sample service thread"); + } + if consensus_mode { + let consensus_service_config = ConsensusServiceConfig { + cluster: rpc_endpoint.clone(), + archive_config, + instance: db.clone(), + client_status, + sample_qty, + }; - // run - sample_service - .join() - .await - .expect("error in sample service thread"); + let consensus_service = ConsensusService::new(consensus_service_config); + // run the consensus service + consensus_service + .join() + .await + .expect("error in consensus service thread"); + } transaction_service .join() .await @@ -147,6 +166,7 @@ pub fn endpoint(cluster: Cluster) -> String { Cluster::Custom(url) => url, } } +#[derive(Clone, PartialEq, Debug)] pub enum ClientStatus { Initializing(String), SearchingForRPCService(String), diff --git a/tinydancer/src/ui/ui.rs b/tinydancer/src/ui/ui.rs index c4703f6..5bcf9fd 100644 --- a/tinydancer/src/ui/ui.rs +++ b/tinydancer/src/ui/ui.rs @@ -1,3 +1,4 @@ +use crate::block_on_async; use crate::sampler::GetShredResponse; use crate::tinydancer::{ClientService, ClientStatus, TinyDancer}; use async_trait::async_trait; @@ -8,13 +9,14 @@ use crossterm::{ terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen}, }; use spinoff::{spinners, Color as SpinColor, Spinner}; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::thread::sleep; use std::time::Duration; use std::{any::Any, thread::Thread}; use std::{fmt, thread::JoinHandle}; use thiserror::Error; use tiny_logger::logs::info; +use tokio::sync::{Mutex, MutexGuard}; use tui::layout::Rect; use tui::style::{Color, Modifier, Style}; use tui::text::{Span, Spans}; @@ -225,12 +227,30 @@ impl ClientService for UiService { threads.push(std::thread::spawn(move || loop { sleep(Duration::from_millis(100)); + enable_raw_mode(); + if crossterm::event::poll(Duration::from_millis(100)).unwrap() { + let ev = crossterm::event::read().unwrap(); + if ev + == Event::Key(KeyEvent { + code: KeyCode::Char('c'), + modifiers: KeyModifiers::CONTROL, + kind: KeyEventKind::Press, + state: KeyEventState::NONE, + }) + { + let mut status = block_on_async!(client_status.lock()); + *status = ClientStatus::ShuttingDown(String::from( + "Shutting Down Gracefully...", + )); + drop(status); + disable_raw_mode(); + } + } + let status = block_on_async!(client_status.lock()); - let status = client_status.lock().unwrap(); match &*status { ClientStatus::Active(msg) => { spinner.update(spinners::Dots, msg.clone(), SpinColor::Green); - // sleep(Duration::from_secs(100)); } ClientStatus::Initializing(msg) => { spinner.update(spinners::Dots, msg.clone(), SpinColor::Yellow); @@ -245,27 +265,7 @@ impl ClientService for UiService { } _ => {} } - Mutex::unlock(status); - enable_raw_mode(); - if crossterm::event::poll(Duration::from_millis(100)).unwrap() { - let ev = crossterm::event::read().unwrap(); - - if ev - == Event::Key(KeyEvent { - code: KeyCode::Char('c'), - modifiers: KeyModifiers::CONTROL, - kind: KeyEventKind::Press, - state: KeyEventState::NONE, - }) - { - let mut status = client_status.lock().unwrap(); - *status = ClientStatus::ShuttingDown(String::from( - "Shutting Down Gracefully...", - )); - Mutex::unlock(status); - disable_raw_mode(); - } - } + drop(status); })); }