Skip to content

Commit 963249a

Browse files
feat: finish vote aggregation service
Co-authored-by: Anoushk Kharangate <[email protected]>
1 parent eda1991 commit 963249a

File tree

4 files changed

+158
-50
lines changed

4 files changed

+158
-50
lines changed

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/src/validator.rs

+11-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
//! The `validator` module hosts all the validator microservices.
22
33
pub use solana_perf::report_target_features;
4+
use solana_rpc::votes_background_service::{VoteAggregatorService, VoteAggregatorServiceConfig};
45
use {
56
crate::{
67
accounts_hash_verifier::{AccountsHashFaultInjector, AccountsHashVerifier},
@@ -437,6 +438,7 @@ struct TransactionHistoryServices {
437438
max_complete_rewards_slot: Arc<AtomicU64>,
438439
cache_block_meta_sender: Option<CacheBlockMetaSender>,
439440
cache_block_meta_service: Option<CacheBlockMetaService>,
441+
vote_aggregator_service: Option<VoteAggregatorService>
440442
}
441443

442444
pub struct Validator {
@@ -446,6 +448,7 @@ pub struct Validator {
446448
rpc_completed_slots_service: JoinHandle<()>,
447449
optimistically_confirmed_bank_tracker: Option<OptimisticallyConfirmedBankTracker>,
448450
transaction_status_service: Option<TransactionStatusService>,
451+
vote_aggregator_service: Option<VoteAggregatorService>,
449452
rewards_recorder_service: Option<RewardsRecorderService>,
450453
cache_block_meta_service: Option<CacheBlockMetaService>,
451454
entry_notifier_service: Option<EntryNotifierService>,
@@ -693,6 +696,7 @@ impl Validator {
693696
max_complete_rewards_slot,
694697
cache_block_meta_sender,
695698
cache_block_meta_service,
699+
vote_aggregator_service
696700
},
697701
blockstore_process_options,
698702
blockstore_root_scan,
@@ -730,7 +734,7 @@ impl Validator {
730734
));
731735
}
732736
}
733-
737+
734738
let mut cluster_info = ClusterInfo::new(
735739
node.info.clone(),
736740
identity_keypair.clone(),
@@ -768,6 +772,7 @@ impl Validator {
768772
(None, None)
769773
};
770774

775+
771776
let (accounts_package_sender, accounts_package_receiver) = crossbeam_channel::unbounded();
772777
let accounts_hash_verifier = AccountsHashVerifier::new(
773778
accounts_package_sender.clone(),
@@ -1385,6 +1390,7 @@ impl Validator {
13851390
repair_quic_endpoint,
13861391
repair_quic_endpoint_runtime,
13871392
repair_quic_endpoint_join_handle,
1393+
vote_aggregator_service
13881394
})
13891395
}
13901396

@@ -2174,7 +2180,7 @@ fn initialize_rpc_transaction_history_services(
21742180
sender: transaction_status_sender,
21752181
});
21762182
let transaction_status_service = Some(TransactionStatusService::new(
2177-
transaction_status_receiver,
2183+
transaction_status_receiver.clone(),
21782184
max_complete_transaction_status_slot.clone(),
21792185
enable_rpc_transaction_history,
21802186
transaction_notifier,
@@ -2200,6 +2206,8 @@ fn initialize_rpc_transaction_history_services(
22002206
blockstore,
22012207
exit,
22022208
));
2209+
let transaction_status_receiver = Arc::new(transaction_status_receiver);
2210+
let vote_aggregator_service = Some(VoteAggregatorService::new(VoteAggregatorServiceConfig{}, transaction_status_receiver, Arc::new(AtomicBool::new(false))));
22032211
TransactionHistoryServices {
22042212
transaction_status_sender,
22052213
transaction_status_service,
@@ -2209,6 +2217,7 @@ fn initialize_rpc_transaction_history_services(
22092217
max_complete_rewards_slot,
22102218
cache_block_meta_sender,
22112219
cache_block_meta_service,
2220+
vote_aggregator_service
22122221
}
22132222
}
22142223

rpc/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ solana-transaction-status = { workspace = true }
5353
solana-version = { workspace = true }
5454
solana-vote = { workspace = true }
5555
solana-vote-program = { workspace = true }
56+
solana-program = { workspace = true }
5657
spl-token = { workspace = true, features = ["no-entrypoint"] }
5758
spl-token-2022 = { workspace = true, features = ["no-entrypoint"] }
5859
stream-cancel = { workspace = true }

rpc/src/votes_background_service.rs

+145-48
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,128 @@
1-
use std::{collections::HashMap, thread::{JoinHandle, Builder}, sync::{atomic::{AtomicBool, Ordering}, Arc}, time::Duration};
21
use crossbeam_channel::{Receiver, RecvTimeoutError};
3-
use rayon::iter::{IntoParallelRefIterator, ParallelIterator, IntoParallelIterator};
2+
use jsonrpc_core::futures_util::future::Join;
3+
use rayon::iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator};
44
use solana_ledger::blockstore_processor::TransactionStatusMessage;
5-
use solana_sdk::{pubkey::Pubkey, transaction::SanitizedTransaction, message::SanitizedMessage, slot_history::Slot, signature::Signature};
6-
use solana_vote::vote_parser::parse_sanitized_vote_transaction;
5+
use solana_program::hash::Hash;
6+
use solana_sdk::{
7+
message::SanitizedMessage, pubkey::Pubkey, signature::Signature, slot_history::Slot,
8+
transaction::SanitizedTransaction,
9+
};
10+
use solana_vote::{
11+
vote_parser::{parse_sanitized_vote_transaction, ParsedVote},
12+
vote_transaction::VoteTransaction,
13+
};
714
use solana_vote_program::vote_state::VoteState;
15+
use std::str::FromStr;
16+
use std::{
17+
collections::HashMap,
18+
sync::{
19+
atomic::{AtomicBool, Ordering},
20+
Arc,
21+
},
22+
thread::{Builder, JoinHandle},
23+
time::Duration,
24+
};
25+
use dashmap::DashMap;
26+
27+
pub const LIGHT_CLIENT_PROGRAM: &str = "3UVYmECPPMZSCqWKfENfuoTv51fTDTWicX9xmBD2euKe";
828
#[derive(Debug, Clone)]
9-
pub struct VoteAggregatorServiceConfig{
29+
pub struct VoteAggregatorServiceConfig {
1030
//This would be our "Copy-on-chain" program address
11-
program_of_interest: Pubkey,
12-
validator_set: HashMap<Pubkey, u64>,
31+
// program_of_interest: Pubkey,
32+
// validator_set: HashMap<Pubkey, u64>,
1333
}
1434

1535

16-
pub struct VoteAggregatorService{
17-
thread_hdl: JoinHandle<()>
36+
pub struct VoteAggregatorService {
37+
thread_hdl: JoinHandle<()>,
38+
// logger: JoinHandle<()>,
39+
votedb: Arc<DashMap<(Slot, Hash), Vec<Signature>>>,
1840
}
1941

42+
// Need a thread pool builder using Rayon
43+
//
2044
impl VoteAggregatorService {
2145
pub fn new(
2246
config: VoteAggregatorServiceConfig,
23-
transaction_status_receiver: &Receiver<TransactionStatusMessage>,
47+
transaction_status_receiver: Arc<Receiver<TransactionStatusMessage>>,
2448
exit: Arc<AtomicBool>,
25-
) -> Self{
26-
let thread_hdl = Builder::new().name("votesAggService".to_string()).spawn(move ||
27-
loop {
28-
if exit.load(Ordering::Relaxed){
29-
break;
49+
) -> Self {
50+
let mut votedb: Arc<DashMap<(Slot, Hash), Vec<Signature>>> = Arc::new(DashMap::default());
51+
let votedb_t = Arc::clone(&votedb);
52+
let thread_hdl = Builder::new()
53+
.name("votesAggService".to_string())
54+
.spawn(move || loop {
55+
if exit.load(Ordering::Relaxed) {
56+
break;
57+
}
58+
59+
// listens to receiver channel
60+
// X - if it receives status message then filter transaction of interest from batches
61+
// filter vote txns then parse the data and read the slot and bankhash that they voted on
62+
// store in hashmap
63+
/*
64+
pub enum VoteTransaction {
65+
Vote(Vote),
66+
VoteStateUpdate(VoteStateUpdate),
3067
}
3168
69+
pub struct Vote {
70+
/// A stack of votes starting with the oldest vote
71+
pub slots: Vec<Slot>,
72+
/// signature of the bank's state at the last slot
73+
pub hash: Hash,
74+
/// processing timestamp of last slot
75+
pub timestamp: Option<UnixTimestamp>,
76+
}
3277
3378
34-
}
35-
).unwrap();
36-
Self { thread_hdl }
79+
pub struct VoteStateUpdate {
80+
/// The proposed tower
81+
pub lockouts: VecDeque<Lockout>,
82+
/// The proposed root
83+
pub root: Option<Slot>,
84+
/// signature of the bank's state at the last slot
85+
pub hash: Hash,
86+
/// processing timestamp of last slot
87+
pub timestamp: Option<UnixTimestamp>,
88+
}
89+
*/
90+
let vote_txns = VoteAggregatorService::filter_vote_transactions(
91+
transaction_status_receiver.clone(),
92+
);
93+
match vote_txns {
94+
Ok(votes) => {
95+
let parsed_votes: Vec<ParsedVote> = votes
96+
.iter()
97+
.map(|tx| parse_sanitized_vote_transaction(tx))
98+
.flatten()
99+
.collect();
100+
let _ = parsed_votes.into_iter().map(|v| {
101+
let key = (v.1.slots().last().unwrap().to_owned(), v.1.hash());
102+
let binding = votedb_t.get(&key);
103+
let maybe_prev_entry: Option<&Vec<Signature>> =
104+
binding.as_deref().clone();
105+
if let Some(prev_entry) = maybe_prev_entry {
106+
let mut new_entry = prev_entry.clone();
107+
new_entry.push(v.3);
108+
votedb_t.insert((v.1.slots().last().unwrap().to_owned(), v.1.hash()), new_entry.clone());
109+
info!("vote_aggregator_service, {:?}, {:?}, {:?}",v.1.slots().last().unwrap().to_owned(), v.1.hash(), new_entry);
110+
} else {
111+
votedb_t.insert((v.1.slots().last().unwrap().to_owned(), v.1.hash()), vec![v.3]);
112+
info!("vote_aggregator_service {:?}, {:?}, {:?} ",v.1.slots().last().unwrap().to_owned(), v.1.hash(), vec![v.3]);
113+
}
114+
});
115+
}
116+
_ => {}
117+
}
118+
// let display1:Vec<&Vec<Signature>> = votedb.iter().map(|v| v.value()).collect();
119+
120+
})
121+
.unwrap();
122+
Self {
123+
thread_hdl,
124+
votedb,
125+
}
37126
}
38127

39128
pub fn join(self) -> std::thread::Result<()> {
@@ -42,48 +131,59 @@ impl VoteAggregatorService {
42131

43132
// filters by signature
44133
pub fn filter_transaction_of_interest(
45-
transaction_status_receiver: &Receiver<TransactionStatusMessage>,
46-
t_o_i_signature: &Signature,
47-
) -> Result<SanitizedTransaction, RecvTimeoutError>{
134+
transaction_status_receiver: Arc<Receiver<TransactionStatusMessage>>,
135+
// t_o_i_pubkey: &Pubkey,
136+
) -> Result<Option<SanitizedTransaction>, RecvTimeoutError> {
48137
match transaction_status_receiver.recv_timeout(Duration::from_secs(1)) {
49-
Ok(TransactionStatusMessage::Batch(batch)) => {
138+
Ok(TransactionStatusMessage::Batch(batch)) => {
50139
// filter out vote transactions as we dont need them.
51-
let filter_txs: Vec<_> = batch.transactions.par_iter().filter_map(|t|{
52-
if !t.is_simple_vote_transaction(){
53-
Some(t)
54-
} else {
55-
None
56-
}
57-
}).collect();
140+
let txns = batch.transactions.clone();
141+
let filter_txs: Vec<_> = txns
142+
.into_par_iter()
143+
.filter_map(|t| {
144+
if !t.is_simple_vote_transaction() {
145+
Some(t)
146+
} else {
147+
None
148+
}
149+
})
150+
.collect();
58151

59152
let transaction_of_interest = filter_txs.into_par_iter().find_any(|t| {
60-
t.signature() == t_o_i_signature
61-
}).unwrap();
62-
153+
t.message()
154+
.account_keys()
155+
.iter()
156+
.find(|key| key == &&Pubkey::from_str(LIGHT_CLIENT_PROGRAM).unwrap())
157+
.is_some()
158+
});
159+
63160
Ok(transaction_of_interest.clone())
64-
},
161+
}
65162
//TODO: can handle this case in a better way.
66-
Ok(TransactionStatusMessage::Freeze(_)) => { Err(RecvTimeoutError::Timeout)},
67-
Err(e) => Err(e),
163+
Ok(TransactionStatusMessage::Freeze(_)) => Err(RecvTimeoutError::Timeout),
164+
Err(e) => Err(e),
68165
}
69-
70166
}
71167

72168
pub fn filter_vote_transactions(
73-
receiver: &Receiver<TransactionStatusMessage>,
169+
receiver: Arc<Receiver<TransactionStatusMessage>>,
74170
) -> Result<Vec<SanitizedTransaction>, RecvTimeoutError> {
75171
match receiver.recv_timeout(Duration::from_secs(1)) {
76172
Ok(msg) => match msg {
77173
TransactionStatusMessage::Batch(batch) => {
78-
let filtered_txs: Vec<_> = batch.transactions.into_par_iter().filter_map(|t| {
79-
if t.is_simple_vote_transaction() {
80-
Some(t)
81-
} else {
82-
None
83-
}
84-
}).collect();
174+
let filtered_txs: Vec<_> = batch
175+
.transactions
176+
.into_par_iter()
177+
.filter_map(|t| {
178+
if t.is_simple_vote_transaction() {
179+
Some(t)
180+
} else {
181+
None
182+
}
183+
})
184+
.collect();
85185
Ok(filtered_txs)
86-
},
186+
}
87187
_ => Ok(Vec::new()), // Return an empty vector for non-Batch messages
88188
},
89189
Err(err) => Err(err), // Handle the receive error
@@ -94,7 +194,4 @@ impl VoteAggregatorService {
94194
// ) -> Vec<VoteState>{
95195

96196
// }
97-
98-
99197
}
100-

0 commit comments

Comments
 (0)