Skip to content

Commit eda1991

Browse files
committed
feat: add transaction of interest and vote filter helpers
1 parent 17feb5b commit eda1991

File tree

1 file changed

+43
-42
lines changed

1 file changed

+43
-42
lines changed

rpc/src/votes_background_service.rs

+43-42
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
use std::{collections::HashMap, thread::{JoinHandle, Builder}, sync::{atomic::{AtomicBool, Ordering}, Arc}, time::Duration};
22
use crossbeam_channel::{Receiver, RecvTimeoutError};
3-
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
3+
use rayon::iter::{IntoParallelRefIterator, ParallelIterator, IntoParallelIterator};
44
use solana_ledger::blockstore_processor::TransactionStatusMessage;
5-
use solana_sdk::{pubkey::Pubkey, transaction::SanitizedTransaction, message::SanitizedMessage};
6-
7-
5+
use solana_sdk::{pubkey::Pubkey, transaction::SanitizedTransaction, message::SanitizedMessage, slot_history::Slot, signature::Signature};
6+
use solana_vote::vote_parser::parse_sanitized_vote_transaction;
7+
use solana_vote_program::vote_state::VoteState;
88
#[derive(Debug, Clone)]
99
pub struct VoteAggregatorServiceConfig{
1010
//This would be our "Copy-on-chain" program address
@@ -40,60 +40,61 @@ impl VoteAggregatorService {
4040
self.thread_hdl.join()
4141
}
4242

43+
// filters by signature
4344
pub fn filter_transaction_of_interest(
4445
transaction_status_receiver: &Receiver<TransactionStatusMessage>,
45-
t_o_i_pubkey: Pubkey,
46+
t_o_i_signature: &Signature,
4647
) -> Result<SanitizedTransaction, RecvTimeoutError>{
47-
match transaction_status_receiver.recv_timeout(Duration::from_secs(1))? {
48-
TransactionStatusMessage::Batch(batch) => {
49-
48+
match transaction_status_receiver.recv_timeout(Duration::from_secs(1)) {
49+
Ok(TransactionStatusMessage::Batch(batch)) => {
5050
// filter out vote transactions as we dont need them.
5151
let filter_txs: Vec<_> = batch.transactions.par_iter().filter_map(|t|{
5252
if !t.is_simple_vote_transaction(){
5353
Some(t)
5454
} else {
55-
None
55+
None
5656
}
5757
}).collect();
5858

59-
// extract out `TransactionMessage`s from the filtered transactions.
60-
let extracted_tx_messages: Vec<_> = filter_txs.par_iter().map(|t|{
61-
t.message()
62-
}).collect();
63-
64-
for m in extracted_tx_messages.iter(){
65-
// any operation on m in this block
66-
match m {
67-
SanitizedMessage::Legacy(m) => {
68-
let txs = m.message.account_keys.par_iter().for_each(|k| k == t_o_i_pubkey).collect();
69-
},
70-
SanitizedMessage::V0(m) => {
71-
72-
},
73-
}
74-
}
75-
76-
let tx = extracted_tx_messages.par_iter().map(|m|{
77-
match m{
78-
SanitizedMessage::Legacy(m) => {
79-
if m.message.account_keys.par_iter()
80-
},
81-
SanitizedMessage::V0(m) => {
82-
83-
},
84-
}
85-
})
86-
87-
88-
89-
90-
Ok(())
59+
let transaction_of_interest = filter_txs.into_par_iter().find_any(|t| {
60+
t.signature() == t_o_i_signature
61+
}).unwrap();
62+
63+
Ok(transaction_of_interest.clone())
9164
},
92-
TransactionStatusMessage::Freeze(slot) => todo!(),
65+
//TODO: can handle this case in a better way.
66+
Ok(TransactionStatusMessage::Freeze(_)) => { Err(RecvTimeoutError::Timeout)},
67+
Err(e) => Err(e),
9368
}
9469

9570
}
9671

72+
pub fn filter_vote_transactions(
73+
receiver: &Receiver<TransactionStatusMessage>,
74+
) -> Result<Vec<SanitizedTransaction>, RecvTimeoutError> {
75+
match receiver.recv_timeout(Duration::from_secs(1)) {
76+
Ok(msg) => match msg {
77+
TransactionStatusMessage::Batch(batch) => {
78+
let filtered_txs: Vec<_> = batch.transactions.into_par_iter().filter_map(|t| {
79+
if t.is_simple_vote_transaction() {
80+
Some(t)
81+
} else {
82+
None
83+
}
84+
}).collect();
85+
Ok(filtered_txs)
86+
},
87+
_ => Ok(Vec::new()), // Return an empty vector for non-Batch messages
88+
},
89+
Err(err) => Err(err), // Handle the receive error
90+
}
91+
}
92+
// pub fn get_votes_for_slot(
93+
// slot: u64,
94+
// ) -> Vec<VoteState>{
95+
96+
// }
97+
9798

9899
}
99100

0 commit comments

Comments
 (0)