Skip to content

Commit 17feb5b

Browse files
committed
add vote transaction filtering: wip
1 parent 8851137 commit 17feb5b

File tree

1 file changed

+85
-8
lines changed

1 file changed

+85
-8
lines changed

rpc/src/votes_background_service.rs

+85-8
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
1-
use std::{collections::{HashSet, HashMap}, thread::JoinHandle};
2-
use solana_sdk::pubkey::Pubkey;
1+
use std::{collections::HashMap, thread::{JoinHandle, Builder}, sync::{atomic::{AtomicBool, Ordering}, Arc}, time::Duration};
2+
use crossbeam_channel::{Receiver, RecvTimeoutError};
3+
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
4+
use solana_ledger::blockstore_processor::TransactionStatusMessage;
5+
use solana_sdk::{pubkey::Pubkey, transaction::SanitizedTransaction, message::SanitizedMessage};
36

47

58
#[derive(Debug, Clone)]
@@ -14,9 +17,83 @@ pub struct VoteAggregatorService{
1417
thread_hdl: JoinHandle<()>
1518
}
1619

17-
impl VoteAggregatorService{
18-
pub fn new() -> Self{
19-
// let thread_hdl =
20-
Self { thread_hdl: () }
21-
}
22-
}
20+
impl VoteAggregatorService {
21+
pub fn new(
22+
config: VoteAggregatorServiceConfig,
23+
transaction_status_receiver: &Receiver<TransactionStatusMessage>,
24+
exit: Arc<AtomicBool>,
25+
) -> Self{
26+
let thread_hdl = Builder::new().name("votesAggService".to_string()).spawn(move ||
27+
loop {
28+
if exit.load(Ordering::Relaxed){
29+
break;
30+
}
31+
32+
33+
34+
}
35+
).unwrap();
36+
Self { thread_hdl }
37+
}
38+
39+
pub fn join(self) -> std::thread::Result<()> {
40+
self.thread_hdl.join()
41+
}
42+
43+
pub fn filter_transaction_of_interest(
44+
transaction_status_receiver: &Receiver<TransactionStatusMessage>,
45+
t_o_i_pubkey: Pubkey,
46+
) -> Result<SanitizedTransaction, RecvTimeoutError>{
47+
match transaction_status_receiver.recv_timeout(Duration::from_secs(1))? {
48+
TransactionStatusMessage::Batch(batch) => {
49+
50+
// filter out vote transactions as we dont need them.
51+
let filter_txs: Vec<_> = batch.transactions.par_iter().filter_map(|t|{
52+
if !t.is_simple_vote_transaction(){
53+
Some(t)
54+
} else {
55+
None
56+
}
57+
}).collect();
58+
59+
// extract out `TransactionMessage`s from the filtered transactions.
60+
let extracted_tx_messages: Vec<_> = filter_txs.par_iter().map(|t|{
61+
t.message()
62+
}).collect();
63+
64+
for m in extracted_tx_messages.iter(){
65+
// any operation on m in this block
66+
match m {
67+
SanitizedMessage::Legacy(m) => {
68+
let txs = m.message.account_keys.par_iter().for_each(|k| k == t_o_i_pubkey).collect();
69+
},
70+
SanitizedMessage::V0(m) => {
71+
72+
},
73+
}
74+
}
75+
76+
let tx = extracted_tx_messages.par_iter().map(|m|{
77+
match m{
78+
SanitizedMessage::Legacy(m) => {
79+
if m.message.account_keys.par_iter()
80+
},
81+
SanitizedMessage::V0(m) => {
82+
83+
},
84+
}
85+
})
86+
87+
88+
89+
90+
Ok(())
91+
},
92+
TransactionStatusMessage::Freeze(slot) => todo!(),
93+
}
94+
95+
}
96+
97+
98+
}
99+

0 commit comments

Comments
 (0)