Skip to content

Commit 75adeee

Browse files
committed
refactor(rpc)!: update mempool interface and test code WIP
1 parent 18fad27 commit 75adeee

File tree

4 files changed

+109
-61
lines changed

4 files changed

+109
-61
lines changed

crates/bitcoind_rpc/src/lib.rs

Lines changed: 72 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@
1010
#![warn(missing_docs)]
1111

1212
use bdk_core::{BlockId, CheckPoint};
13-
use bitcoin::{block::Header, Block, BlockHash, Transaction};
13+
use bitcoin::{block::Header, Block, BlockHash, Transaction, Txid};
1414
use bitcoincore_rpc::bitcoincore_rpc_json;
15+
use std::collections::HashSet;
1516

1617
pub mod bip158;
1718

@@ -64,18 +65,21 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
6465
}
6566
}
6667

67-
/// Emit mempool transactions, alongside their first-seen unix timestamps.
68+
/// Emit mempool transactions and capture the initial snapshot of all mempool [`Txid`]s.
6869
///
69-
/// This method emits each transaction only once, unless we cannot guarantee the transaction's
70-
/// ancestors are already emitted.
70+
/// This method returns a [`MempoolEvent`] containing the full transactions (with their
71+
/// first-seen unix timestamps) that were emitted, and the set of all [`Txid`]s present from the
72+
/// initial mempool query. Each transaction is emitted only once, unless we cannot guarantee the
73+
/// transaction's ancestors are already emitted.
7174
///
7275
/// To understand why, consider a receiver which filters transactions based on whether it
7376
/// alters the UTXO set of tracked script pubkeys. If an emitted mempool transaction spends a
7477
/// tracked UTXO which is confirmed at height `h`, but the receiver has only seen up to block
7578
/// of height `h-1`, we want to re-emit this transaction until the receiver has seen the block
7679
/// at height `h`.
77-
pub fn mempool(&mut self) -> Result<Vec<(Transaction, u64)>, bitcoincore_rpc::Error> {
80+
pub fn mempool(&mut self) -> Result<MempoolEvent, bitcoincore_rpc::Error> {
7881
let client = self.client;
82+
let mut emitted_txs = Vec::new();
7983

8084
// This is the emitted tip height during the last mempool emission.
8185
let prev_mempool_tip = self
@@ -91,44 +95,49 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
9195
let prev_mempool_time = self.last_mempool_time;
9296
let mut latest_time = prev_mempool_time;
9397

94-
let txs_to_emit = client
95-
.get_raw_mempool_verbose()?
96-
.into_iter()
97-
.filter_map({
98-
let latest_time = &mut latest_time;
99-
move |(txid, tx_entry)| -> Option<Result<_, bitcoincore_rpc::Error>> {
100-
let tx_time = tx_entry.time as usize;
101-
if tx_time > *latest_time {
102-
*latest_time = tx_time;
103-
}
98+
// Get the raw mempool result from the RPC client.
99+
let raw_mempool = client.get_raw_mempool_verbose()?;
100+
let raw_mempool_txids = raw_mempool.keys().copied().collect::<HashSet<_>>();
104101

105-
// Avoid emitting transactions that are already emitted if we can guarantee
106-
// blocks containing ancestors are already emitted. The bitcoind rpc interface
107-
// provides us with the block height that the tx is introduced to the mempool.
108-
// If we have already emitted the block of height, we can assume that all
109-
// ancestor txs have been processed by the receiver.
110-
let is_already_emitted = tx_time <= prev_mempool_time;
111-
let is_within_height = tx_entry.height <= prev_mempool_tip as _;
112-
if is_already_emitted && is_within_height {
113-
return None;
114-
}
102+
for (txid, tx_entry) in raw_mempool.into_iter() {
103+
let tx_time = tx_entry.time as usize;
104+
if tx_time > latest_time {
105+
latest_time = tx_time;
106+
}
115107

116-
let tx = match client.get_raw_transaction(&txid, None) {
117-
Ok(tx) => tx,
118-
// the tx is confirmed or evicted since `get_raw_mempool_verbose`
119-
Err(err) if err.is_not_found_error() => return None,
120-
Err(err) => return Some(Err(err)),
121-
};
108+
// Avoid emitting transactions that are already emitted if we can guarantee blocks
109+
// containing ancestors are already emitted. The bitcoind rpc interface provides us with
110+
// the block height that the tx is introduced to the mempool. If we have already emitted
111+
// the block of height, we can assume that all ancestor txs have been processed by the
112+
// receiver.
113+
let is_already_emitted = tx_time <= prev_mempool_time;
114+
let is_within_height = tx_entry.height <= prev_mempool_tip as _;
115+
if is_already_emitted && is_within_height {
116+
continue;
117+
}
122118

123-
Some(Ok((tx, tx_time as u64)))
119+
match client.get_raw_transaction(&txid, None) {
120+
Ok(tx) => {
121+
emitted_txs.push((tx, tx_time as u64));
122+
}
123+
Err(err) => {
124+
if err.is_not_found_error() {
125+
// the tx is confirmed or evicted since `get_raw_mempool_verbose`
126+
continue;
127+
} else {
128+
return Err(err);
129+
}
124130
}
125-
})
126-
.collect::<Result<Vec<_>, _>>()?;
131+
};
132+
}
127133

128134
self.last_mempool_time = latest_time;
129135
self.last_mempool_tip = Some(self.last_cp.height());
130136

131-
Ok(txs_to_emit)
137+
Ok(MempoolEvent {
138+
emitted_txs,
139+
raw_mempool_txids,
140+
})
132141
}
133142

134143
/// Emit the next block height and header (if any).
@@ -144,6 +153,34 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
144153
}
145154
}
146155

156+
/// A new emission from mempool.
157+
#[derive(Debug)]
158+
pub struct MempoolEvent {
159+
/// Emitted mempool transactions with their first‐seen unix timestamps.
160+
pub emitted_txs: Vec<(Transaction, u64)>,
161+
162+
/// Set of all [`Txid`]s from the raw mempool result, including transactions that may have been
163+
/// confirmed or evicted during processing. This is used to determine which expected
164+
/// transactions are missing.
165+
pub raw_mempool_txids: HashSet<Txid>,
166+
}
167+
168+
impl MempoolEvent {
169+
/// Given an iterator of expected [`Txid`]s, return those that are missing from the mempool.
170+
pub fn evicted_txids(
171+
&self,
172+
expected_unconfirmed_txids: impl IntoIterator<Item = Txid>,
173+
) -> HashSet<Txid> {
174+
let expected_set = expected_unconfirmed_txids
175+
.into_iter()
176+
.collect::<HashSet<_>>();
177+
expected_set
178+
.difference(&self.raw_mempool_txids)
179+
.copied()
180+
.collect()
181+
}
182+
}
183+
147184
/// A newly emitted block from [`Emitter`].
148185
#[derive(Debug)]
149186
pub struct BlockEvent<B> {

crates/bitcoind_rpc/tests/test_emitter.rs

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ fn test_into_tx_graph() -> anyhow::Result<()> {
189189
assert!(emitter.next_block()?.is_none());
190190

191191
let mempool_txs = emitter.mempool()?;
192-
let indexed_additions = indexed_tx_graph.batch_insert_unconfirmed(mempool_txs);
192+
let indexed_additions = indexed_tx_graph.batch_insert_unconfirmed(mempool_txs.emitted_txs);
193193
assert_eq!(
194194
indexed_additions
195195
.tx_graph
@@ -437,6 +437,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> {
437437
// the first emission should include all transactions
438438
let emitted_txids = emitter
439439
.mempool()?
440+
.emitted_txs
440441
.into_iter()
441442
.map(|(tx, _)| tx.compute_txid())
442443
.collect::<BTreeSet<Txid>>();
@@ -447,7 +448,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> {
447448

448449
// second emission should be empty
449450
assert!(
450-
emitter.mempool()?.is_empty(),
451+
emitter.mempool()?.emitted_txs.is_empty(),
451452
"second emission should be empty"
452453
);
453454

@@ -457,7 +458,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> {
457458
}
458459
while emitter.next_header()?.is_some() {}
459460
assert!(
460-
emitter.mempool()?.is_empty(),
461+
emitter.mempool()?.emitted_txs.is_empty(),
461462
"third emission, after chain tip is extended, should also be empty"
462463
);
463464

@@ -506,6 +507,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<()
506507
assert_eq!(
507508
emitter
508509
.mempool()?
510+
.emitted_txs
509511
.into_iter()
510512
.map(|(tx, _)| tx.compute_txid())
511513
.collect::<BTreeSet<_>>(),
@@ -515,6 +517,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<()
515517
assert_eq!(
516518
emitter
517519
.mempool()?
520+
.emitted_txs
518521
.into_iter()
519522
.map(|(tx, _)| tx.compute_txid())
520523
.collect::<BTreeSet<_>>(),
@@ -535,6 +538,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<()
535538
.collect::<BTreeSet<_>>();
536539
let emitted_txids = emitter
537540
.mempool()?
541+
.emitted_txs
538542
.into_iter()
539543
.map(|(tx, _)| tx.compute_txid())
540544
.collect::<BTreeSet<_>>();
@@ -593,6 +597,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> {
593597
assert_eq!(
594598
emitter
595599
.mempool()?
600+
.emitted_txs
596601
.into_iter()
597602
.map(|(tx, _)| tx.compute_txid())
598603
.collect::<BTreeSet<_>>(),
@@ -628,6 +633,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> {
628633
// include mempool txs introduced at reorg height or greater
629634
let mempool = emitter
630635
.mempool()?
636+
.emitted_txs
631637
.into_iter()
632638
.map(|(tx, _)| tx.compute_txid())
633639
.collect::<BTreeSet<_>>();
@@ -643,6 +649,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> {
643649

644650
let mempool = emitter
645651
.mempool()?
652+
.emitted_txs
646653
.into_iter()
647654
.map(|(tx, _)| tx.compute_txid())
648655
.collect::<BTreeSet<_>>();
@@ -766,7 +773,7 @@ fn test_expect_tx_evicted() -> anyhow::Result<()> {
766773
)?;
767774

768775
let mut emitter = Emitter::new(env.rpc_client(), chain.tip(), 1);
769-
let changeset = graph.batch_insert_unconfirmed(emitter.mempool()?);
776+
let changeset = graph.batch_insert_unconfirmed(emitter.mempool()?.emitted_txs);
770777
assert!(changeset
771778
.tx_graph
772779
.txs
@@ -806,23 +813,29 @@ fn test_expect_tx_evicted() -> anyhow::Result<()> {
806813
// Send the tx.
807814
let txid_2 = core.send_raw_transaction(&tx1b)?;
808815

809-
// We evict the expected txs that are missing from mempool.
810-
let exp_txids = graph
816+
// Retrieve the expected unconfirmed txids and spks from the graph.
817+
let exp_spk_txids = graph
811818
.expected_unconfirmed_spk_txids(&chain, chain_tip, ..)?
812819
.collect::<Vec<_>>();
813-
assert_eq!(exp_txids, vec![(txid_1, spk)]);
814-
let mempool = emitter
815-
.mempool()?
816-
.into_iter()
820+
assert_eq!(exp_spk_txids, vec![(txid_1, spk)]);
821+
822+
// Check that mempool emission contains evicted txid.
823+
let mempool_event = emitter.mempool()?;
824+
let unseen_txids: Vec<Txid> = mempool_event
825+
.emitted_txs
826+
.iter()
817827
.map(|(tx, _)| tx.compute_txid())
818-
.collect::<Vec<_>>();
819-
assert!(mempool.contains(&txid_2));
820-
for (txid, _) in exp_txids {
821-
if !mempool.contains(&txid) {
822-
let _ = graph.insert_evicted_at(txid, seen_at + 1);
823-
}
828+
.collect();
829+
assert!(unseen_txids.contains(&txid_2));
830+
831+
// Update graph with evicted tx.
832+
let exp_txids = exp_spk_txids.into_iter().map(|(txid, _)| txid);
833+
let evicted_txids = mempool_event.evicted_txids(exp_txids);
834+
for txid in evicted_txids {
835+
let _ = graph.insert_evicted_at(txid, seen_at + 1);
824836
}
825837

838+
// tx1 should no longer be canonical.
826839
assert!(graph
827840
.graph()
828841
.list_canonical_txs(&chain, chain_tip)

example-crates/example_bitcoind_rpc_polling/src/main.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,7 @@ use bdk_bitcoind_rpc::{
1111
bitcoincore_rpc::{Auth, Client, RpcApi},
1212
Emitter,
1313
};
14-
use bdk_chain::{
15-
bitcoin::{Block, Transaction},
16-
local_chain, Merge,
17-
};
14+
use bdk_chain::{bitcoin::Block, local_chain, Merge};
1815
use example_cli::{
1916
anyhow,
2017
clap::{self, Args, Subcommand},
@@ -36,7 +33,7 @@ const DB_COMMIT_DELAY: Duration = Duration::from_secs(60);
3633
#[derive(Debug)]
3734
enum Emission {
3835
Block(bdk_bitcoind_rpc::BlockEvent<Block>),
39-
Mempool(Vec<(Transaction, u64)>),
36+
Mempool(bdk_bitcoind_rpc::MempoolEvent),
4037
Tip(u32),
4138
}
4239

@@ -204,7 +201,7 @@ fn main() -> anyhow::Result<()> {
204201
let graph_changeset = graph
205202
.lock()
206203
.unwrap()
207-
.batch_insert_relevant_unconfirmed(mempool_txs);
204+
.batch_insert_relevant_unconfirmed(mempool_txs.emitted_txs);
208205
{
209206
let db = &mut *db.lock().unwrap();
210207
db_stage.merge(ChangeSet {
@@ -287,7 +284,8 @@ fn main() -> anyhow::Result<()> {
287284
(chain_changeset, graph_changeset)
288285
}
289286
Emission::Mempool(mempool_txs) => {
290-
let graph_changeset = graph.batch_insert_relevant_unconfirmed(mempool_txs);
287+
let graph_changeset =
288+
graph.batch_insert_relevant_unconfirmed(mempool_txs.emitted_txs);
291289
(local_chain::ChangeSet::default(), graph_changeset)
292290
}
293291
Emission::Tip(h) => {

example-crates/example_wallet_rpc/src/main.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use bdk_bitcoind_rpc::{
33
Emitter,
44
};
55
use bdk_wallet::{
6-
bitcoin::{Block, Network, Transaction},
6+
bitcoin::{Block, Network},
77
file_store::Store,
88
KeychainKind, Wallet,
99
};
@@ -73,7 +73,7 @@ impl Args {
7373
enum Emission {
7474
SigTerm,
7575
Block(bdk_bitcoind_rpc::BlockEvent<Block>),
76-
Mempool(Vec<(Transaction, u64)>),
76+
Mempool(bdk_bitcoind_rpc::MempoolEvent),
7777
}
7878

7979
fn main() -> anyhow::Result<()> {
@@ -157,7 +157,7 @@ fn main() -> anyhow::Result<()> {
157157
}
158158
Emission::Mempool(mempool_emission) => {
159159
let start_apply_mempool = Instant::now();
160-
wallet.apply_unconfirmed_txs(mempool_emission);
160+
wallet.apply_unconfirmed_txs(mempool_emission.emitted_txs);
161161
wallet.persist(&mut db)?;
162162
println!(
163163
"Applied unconfirmed transactions in {}s",

0 commit comments

Comments
 (0)