Skip to content

Commit 8513d83

Browse files
evanlinjinLagginTimes
authored andcommitted
feat(bitcoind_rpc)!: Reduce friction of Emitter API.
* Change signature of `Emitter::new` so that `expected_mempool_txids` can be more easily constructed from `TxGraph` methods. * Change generic bounds of `C` within `Emitter<C>` to be `C: DeRef, C::Target: RpcApi`. This allows the caller to have `Arc<Client>` as `C` and does not force to caller to hold a lifetimed reference.
1 parent 0a02d26 commit 8513d83

File tree

3 files changed

+95
-43
lines changed

3 files changed

+95
-43
lines changed

crates/bitcoind_rpc/src/lib.rs

Lines changed: 40 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,14 @@
44
//! [`Emitter`] is the main structure which sources blockchain data from [`bitcoincore_rpc::Client`].
55
//!
66
//! To only get block updates (exclude mempool transactions), the caller can use
7-
//! [`Emitter::next_block`] or/and [`Emitter::next_header`] until it returns `Ok(None)` (which means
8-
//! the chain tip is reached). A separate method, [`Emitter::mempool`] can be used to emit the whole
9-
//! mempool.
7+
//! [`Emitter::next_block`] until it returns `Ok(None)` (which means the chain tip is reached). A
8+
//! separate method, [`Emitter::mempool`] can be used to emit the whole mempool.
109
#![warn(missing_docs)]
1110

1211
use bdk_core::{BlockId, CheckPoint};
1312
use bitcoin::{Block, BlockHash, Transaction, Txid};
14-
use bitcoincore_rpc::bitcoincore_rpc_json;
15-
use std::collections::HashSet;
13+
use bitcoincore_rpc::{bitcoincore_rpc_json, RpcApi};
14+
use std::{collections::HashSet, ops::Deref};
1615

1716
pub mod bip158;
1817

@@ -23,8 +22,8 @@ pub use bitcoincore_rpc;
2322
/// Refer to [module-level documentation] for more.
2423
///
2524
/// [module-level documentation]: crate
26-
pub struct Emitter<'c, C> {
27-
client: &'c C,
25+
pub struct Emitter<C> {
26+
client: C,
2827
start_height: u32,
2928

3029
/// The checkpoint of the last-emitted block that is in the best chain. If it is later found
@@ -56,7 +55,17 @@ pub struct Emitter<'c, C> {
5655
expected_mempool_txids: HashSet<Txid>,
5756
}
5857

59-
impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
58+
/// Indicates that there are no initially expected mempool transactions.
59+
///
60+
/// Pass this to the `expected_mempool_txids` field of [`Emitter::new`] when the wallet is known
61+
/// to start empty (i.e. with no unconfirmed transactions).
62+
pub const NO_EXPECTED_MEMPOOL_TXIDS: core::iter::Empty<Txid> = core::iter::empty();
63+
64+
impl<C> Emitter<C>
65+
where
66+
C: Deref,
67+
C::Target: RpcApi,
68+
{
6069
/// Construct a new [`Emitter`].
6170
///
6271
/// `last_cp` informs the emitter of the chain we are starting off with. This way, the emitter
@@ -66,12 +75,13 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
6675
/// original chain).
6776
///
6877
/// `expected_mempool_txids` is the initial set of unconfirmed txids provided by the wallet.
69-
/// This allows the [`Emitter`] to inform the wallet about relevant mempool evictions.
78+
/// This allows the [`Emitter`] to inform the wallet about relevant mempool evictions. If it is
79+
/// known that the wallet is empty, [`NO_EXPECTED_MEMPOOL_TXIDS`] can be used.
7080
pub fn new(
71-
client: &'c C,
81+
client: C,
7282
last_cp: CheckPoint,
7383
start_height: u32,
74-
expected_mempool_txids: HashSet<Txid>,
84+
expected_mempool_txids: impl IntoIterator<Item = impl Into<Txid>>,
7585
) -> Self {
7686
Self {
7787
client,
@@ -80,7 +90,7 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
8090
last_block: None,
8191
last_mempool_time: 0,
8292
last_mempool_tip: None,
83-
expected_mempool_txids,
93+
expected_mempool_txids: expected_mempool_txids.into_iter().map(Into::into).collect(),
8494
}
8595
}
8696

@@ -102,7 +112,7 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
102112
/// of height `h-1`, we want to re-emit this transaction until the receiver has seen the block
103113
/// at height `h`.
104114
pub fn mempool(&mut self) -> Result<MempoolEvent, bitcoincore_rpc::Error> {
105-
let client = self.client;
115+
let client = &*self.client;
106116

107117
// This is the emitted tip height during the last mempool emission.
108118
let prev_mempool_tip = self
@@ -204,7 +214,7 @@ impl<'c, C: bitcoincore_rpc::RpcApi> Emitter<'c, C> {
204214

205215
/// Emit the next block height and block (if any).
206216
pub fn next_block(&mut self) -> Result<Option<BlockEvent<Block>>, bitcoincore_rpc::Error> {
207-
if let Some((checkpoint, block)) = poll(self, |hash| self.client.get_block(hash))? {
217+
if let Some((checkpoint, block)) = poll(self, move |hash, client| client.get_block(hash))? {
208218
// Stop tracking unconfirmed transactions that have been confirmed in this block.
209219
for tx in &block.txdata {
210220
self.expected_mempool_txids.remove(&tx.compute_txid());
@@ -247,7 +257,7 @@ impl MempoolEvent {
247257
/// A newly emitted block from [`Emitter`].
248258
#[derive(Debug)]
249259
pub struct BlockEvent<B> {
250-
/// Either a full [`Block`] or [`Header`] of the new block.
260+
/// The block.
251261
pub block: B,
252262

253263
/// The checkpoint of the new block.
@@ -299,9 +309,10 @@ enum PollResponse {
299309

300310
fn poll_once<C>(emitter: &Emitter<C>) -> Result<PollResponse, bitcoincore_rpc::Error>
301311
where
302-
C: bitcoincore_rpc::RpcApi,
312+
C: Deref,
313+
C::Target: RpcApi,
303314
{
304-
let client = emitter.client;
315+
let client = &*emitter.client;
305316

306317
if let Some(last_res) = &emitter.last_block {
307318
let next_hash = if last_res.height < emitter.start_height as _ {
@@ -355,15 +366,16 @@ fn poll<C, V, F>(
355366
get_item: F,
356367
) -> Result<Option<(CheckPoint, V)>, bitcoincore_rpc::Error>
357368
where
358-
C: bitcoincore_rpc::RpcApi,
359-
F: Fn(&BlockHash) -> Result<V, bitcoincore_rpc::Error>,
369+
C: Deref,
370+
C::Target: RpcApi,
371+
F: Fn(&BlockHash, &C::Target) -> Result<V, bitcoincore_rpc::Error>,
360372
{
361373
loop {
362374
match poll_once(emitter)? {
363375
PollResponse::Block(res) => {
364376
let height = res.height as u32;
365377
let hash = res.hash;
366-
let item = get_item(&hash)?;
378+
let item = get_item(&hash, &emitter.client)?;
367379

368380
let new_cp = emitter
369381
.last_cp
@@ -432,19 +444,23 @@ impl BitcoindRpcErrorExt for bitcoincore_rpc::Error {
432444

433445
#[cfg(test)]
434446
mod test {
435-
use crate::{bitcoincore_rpc::RpcApi, Emitter};
436-
use bdk_bitcoind_rpc::bitcoincore_rpc::bitcoin::Txid;
447+
use crate::{bitcoincore_rpc::RpcApi, Emitter, NO_EXPECTED_MEMPOOL_TXIDS};
437448
use bdk_chain::local_chain::LocalChain;
438449
use bdk_testenv::{anyhow, TestEnv};
439-
use bitcoin::{hashes::Hash, Address, Amount, ScriptBuf, WScriptHash};
450+
use bitcoin::{hashes::Hash, Address, Amount, ScriptBuf, Txid, WScriptHash};
440451
use std::collections::HashSet;
441452

442453
#[test]
443454
fn test_expected_mempool_txids_accumulate_and_remove() -> anyhow::Result<()> {
444455
let env = TestEnv::new()?;
445456
let chain = LocalChain::from_genesis_hash(env.rpc_client().get_block_hash(0)?).0;
446457
let chain_tip = chain.tip();
447-
let mut emitter = Emitter::new(env.rpc_client(), chain_tip.clone(), 1, HashSet::new());
458+
let mut emitter = Emitter::new(
459+
env.rpc_client(),
460+
chain_tip.clone(),
461+
1,
462+
NO_EXPECTED_MEMPOOL_TXIDS,
463+
);
448464

449465
env.mine_blocks(100, None)?;
450466
while emitter.next_block()?.is_some() {}

crates/bitcoind_rpc/tests/test_emitter.rs

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1-
use std::collections::{BTreeMap, BTreeSet, HashSet};
1+
use std::{
2+
collections::{BTreeMap, BTreeSet, HashSet},
3+
ops::Deref,
4+
};
25

3-
use bdk_bitcoind_rpc::Emitter;
6+
use bdk_bitcoind_rpc::{Emitter, NO_EXPECTED_MEMPOOL_TXIDS};
47
use bdk_chain::{
58
bitcoin::{Address, Amount, Txid},
69
local_chain::{CheckPoint, LocalChain},
@@ -22,7 +25,12 @@ pub fn test_sync_local_chain() -> anyhow::Result<()> {
2225
let env = TestEnv::new()?;
2326
let network_tip = env.rpc_client().get_block_count()?;
2427
let (mut local_chain, _) = LocalChain::from_genesis_hash(env.rpc_client().get_block_hash(0)?);
25-
let mut emitter = Emitter::new(env.rpc_client(), local_chain.tip(), 0, HashSet::new());
28+
let mut emitter = Emitter::new(
29+
env.rpc_client(),
30+
local_chain.tip(),
31+
0,
32+
NO_EXPECTED_MEMPOOL_TXIDS,
33+
);
2634

2735
// Mine some blocks and return the actual block hashes.
2836
// Because initializing `ElectrsD` already mines some blocks, we must include those too when
@@ -156,7 +164,7 @@ fn test_into_tx_graph() -> anyhow::Result<()> {
156164
index
157165
});
158166

159-
let emitter = &mut Emitter::new(env.rpc_client(), chain.tip(), 0, HashSet::new());
167+
let emitter = &mut Emitter::new(env.rpc_client(), chain.tip(), 0, NO_EXPECTED_MEMPOOL_TXIDS);
160168

161169
while let Some(emission) = emitter.next_block()? {
162170
let height = emission.block_height();
@@ -252,7 +260,7 @@ fn ensure_block_emitted_after_reorg_is_at_reorg_height() -> anyhow::Result<()> {
252260
hash: env.rpc_client().get_block_hash(0)?,
253261
}),
254262
EMITTER_START_HEIGHT as _,
255-
HashSet::new(),
263+
NO_EXPECTED_MEMPOOL_TXIDS,
256264
);
257265

258266
env.mine_blocks(CHAIN_TIP_HEIGHT, None)?;
@@ -292,7 +300,8 @@ fn sync_from_emitter<C>(
292300
emitter: &mut Emitter<C>,
293301
) -> anyhow::Result<()>
294302
where
295-
C: bitcoincore_rpc::RpcApi,
303+
C: Deref,
304+
C::Target: bitcoincore_rpc::RpcApi,
296305
{
297306
while let Some(emission) = emitter.next_block()? {
298307
let height = emission.block_height();
@@ -333,7 +342,7 @@ fn tx_can_become_unconfirmed_after_reorg() -> anyhow::Result<()> {
333342
hash: env.rpc_client().get_block_hash(0)?,
334343
}),
335344
0,
336-
HashSet::new(),
345+
NO_EXPECTED_MEMPOOL_TXIDS,
337346
);
338347

339348
// setup addresses
@@ -425,7 +434,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> {
425434
hash: env.rpc_client().get_block_hash(0)?,
426435
}),
427436
0,
428-
HashSet::new(),
437+
NO_EXPECTED_MEMPOOL_TXIDS,
429438
);
430439

431440
// mine blocks and sync up emitter
@@ -492,7 +501,7 @@ fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<()
492501
hash: env.rpc_client().get_block_hash(0)?,
493502
}),
494503
0,
495-
HashSet::new(),
504+
NO_EXPECTED_MEMPOOL_TXIDS,
496505
);
497506

498507
// mine blocks to get initial balance, sync emitter up to tip
@@ -584,7 +593,7 @@ fn mempool_during_reorg() -> anyhow::Result<()> {
584593
hash: env.rpc_client().get_block_hash(0)?,
585594
}),
586595
0,
587-
HashSet::new(),
596+
NO_EXPECTED_MEMPOOL_TXIDS,
588597
);
589598

590599
// mine blocks to get initial balance
@@ -712,7 +721,7 @@ fn no_agreement_point() -> anyhow::Result<()> {
712721
hash: env.rpc_client().get_block_hash(0)?,
713722
}),
714723
(PREMINE_COUNT - 2) as u32,
715-
HashSet::new(),
724+
NO_EXPECTED_MEMPOOL_TXIDS,
716725
);
717726

718727
// mine 101 blocks

examples/example_bitcoind_rpc_polling/src/main.rs

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use std::{
2-
collections::HashSet,
32
path::PathBuf,
43
sync::{
54
atomic::{AtomicBool, Ordering},
@@ -137,9 +136,24 @@ fn main() -> anyhow::Result<()> {
137136
fallback_height, ..
138137
} = rpc_args;
139138

140-
let chain_tip = chain.lock().unwrap().tip();
141139
let rpc_client = rpc_args.new_client()?;
142-
let mut emitter = Emitter::new(&rpc_client, chain_tip, fallback_height, HashSet::new());
140+
let mut emitter = {
141+
let chain = chain.lock().unwrap();
142+
let graph = graph.lock().unwrap();
143+
Emitter::new(
144+
&rpc_client,
145+
chain.tip(),
146+
fallback_height,
147+
graph
148+
.graph()
149+
.list_canonical_txs(
150+
&*chain,
151+
chain.tip().block_id(),
152+
CanonicalizationParams::default(),
153+
)
154+
.filter(|tx| tx.chain_position.is_unconfirmed()),
155+
)
156+
};
143157
let mut db_stage = ChangeSet::default();
144158

145159
let mut last_db_commit = Instant::now();
@@ -222,18 +236,31 @@ fn main() -> anyhow::Result<()> {
222236
} = rpc_args;
223237
let sigterm_flag = start_ctrlc_handler();
224238

225-
let last_cp = chain.lock().unwrap().tip();
239+
let rpc_client = Arc::new(rpc_args.new_client()?);
240+
let mut emitter = {
241+
let chain = chain.lock().unwrap();
242+
let graph = graph.lock().unwrap();
243+
Emitter::new(
244+
rpc_client.clone(),
245+
chain.tip(),
246+
fallback_height,
247+
graph
248+
.graph()
249+
.list_canonical_txs(
250+
&*chain,
251+
chain.tip().block_id(),
252+
CanonicalizationParams::default(),
253+
)
254+
.filter(|tx| tx.chain_position.is_unconfirmed()),
255+
)
256+
};
226257

227258
println!(
228259
"[{:>10}s] starting emitter thread...",
229260
start.elapsed().as_secs_f32()
230261
);
231262
let (tx, rx) = std::sync::mpsc::sync_channel::<Emission>(CHANNEL_BOUND);
232263
let emission_jh = std::thread::spawn(move || -> anyhow::Result<()> {
233-
let rpc_client = rpc_args.new_client()?;
234-
let mut emitter =
235-
Emitter::new(&rpc_client, last_cp, fallback_height, HashSet::new());
236-
237264
let mut block_count = rpc_client.get_block_count()? as u32;
238265
tx.send(Emission::Tip(block_count))?;
239266

0 commit comments

Comments
 (0)