Skip to content

Commit 9d6c795

Browse files
feat(esplora): Handle spks with expected txids
Co-authored-by: Wei Chen <[email protected]>
1 parent 86efbb8 commit 9d6c795

File tree

4 files changed

+314
-37
lines changed

4 files changed

+314
-37
lines changed

crates/esplora/src/async_ext.rs

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
use async_trait::async_trait;
22
use bdk_core::collections::{BTreeMap, BTreeSet, HashSet};
3-
use bdk_core::spk_client::{FullScanRequest, FullScanResponse, SyncRequest, SyncResponse};
3+
use bdk_core::spk_client::{
4+
FullScanRequest, FullScanResponse, SpkWithExpectedTxids, SyncRequest, SyncResponse,
5+
};
46
use bdk_core::{
5-
bitcoin::{BlockHash, OutPoint, ScriptBuf, Txid},
7+
bitcoin::{BlockHash, OutPoint, Txid},
68
BlockId, CheckPoint, ConfirmationBlockTime, Indexed, TxUpdate,
79
};
810
use esplora_client::Sleeper;
@@ -62,7 +64,7 @@ where
6264
stop_gap: usize,
6365
parallel_requests: usize,
6466
) -> Result<FullScanResponse<K>, Error> {
65-
let mut request = request.into();
67+
let mut request: FullScanRequest<K> = request.into();
6668
let start_time = request.start_time();
6769
let keychains = request.keychains();
6870

@@ -77,7 +79,9 @@ where
7779
let mut inserted_txs = HashSet::<Txid>::new();
7880
let mut last_active_indices = BTreeMap::<K, u32>::new();
7981
for keychain in keychains {
80-
let keychain_spks = request.iter_spks(keychain.clone());
82+
let keychain_spks = request
83+
.iter_spks(keychain.clone())
84+
.map(|(spk_i, spk)| (spk_i, spk.into()));
8185
let (update, last_active_index) = fetch_txs_with_keychain_spks(
8286
self,
8387
start_time,
@@ -112,7 +116,7 @@ where
112116
request: R,
113117
parallel_requests: usize,
114118
) -> Result<SyncResponse, Error> {
115-
let mut request = request.into();
119+
let mut request: SyncRequest<I> = request.into();
116120
let start_time = request.start_time();
117121

118122
let chain_tip = request.chain_tip();
@@ -129,7 +133,7 @@ where
129133
self,
130134
start_time,
131135
&mut inserted_txs,
132-
request.iter_spks(),
136+
request.iter_spks_with_expected_txids(),
133137
parallel_requests,
134138
)
135139
.await?,
@@ -291,10 +295,10 @@ async fn fetch_txs_with_keychain_spks<I, S>(
291295
parallel_requests: usize,
292296
) -> Result<(TxUpdate<ConfirmationBlockTime>, Option<u32>), Error>
293297
where
294-
I: Iterator<Item = Indexed<ScriptBuf>> + Send,
298+
I: Iterator<Item = Indexed<SpkWithExpectedTxids>> + Send,
295299
S: Sleeper + Clone + Send + Sync,
296300
{
297-
type TxsOfSpkIndex = (u32, Vec<esplora_client::Tx>);
301+
type TxsOfSpkIndex = (u32, Vec<esplora_client::Tx>, HashSet<Txid>);
298302

299303
let mut update = TxUpdate::<ConfirmationBlockTime>::default();
300304
let mut last_index = Option::<u32>::None;
@@ -306,6 +310,8 @@ where
306310
.take(parallel_requests)
307311
.map(|(spk_index, spk)| {
308312
let client = client.clone();
313+
let expected_txids = spk.expected_txids;
314+
let spk = spk.spk;
309315
async move {
310316
let mut last_seen = None;
311317
let mut spk_txs = Vec::new();
@@ -315,9 +321,15 @@ where
315321
last_seen = txs.last().map(|tx| tx.txid);
316322
spk_txs.extend(txs);
317323
if tx_count < 25 {
318-
break Result::<_, Error>::Ok((spk_index, spk_txs));
324+
break;
319325
}
320326
}
327+
let got_txids = spk_txs.iter().map(|tx| tx.txid).collect::<HashSet<_>>();
328+
let evicted_txids = expected_txids
329+
.difference(&got_txids)
330+
.copied()
331+
.collect::<HashSet<_>>();
332+
Result::<TxsOfSpkIndex, Error>::Ok((spk_index, spk_txs, evicted_txids))
321333
}
322334
})
323335
.collect::<FuturesOrdered<_>>();
@@ -326,7 +338,7 @@ where
326338
break;
327339
}
328340

329-
for (index, txs) in handles.try_collect::<Vec<TxsOfSpkIndex>>().await? {
341+
for (index, txs, evicted) in handles.try_collect::<Vec<TxsOfSpkIndex>>().await? {
330342
last_index = Some(index);
331343
if !txs.is_empty() {
332344
last_active_index = Some(index);
@@ -338,6 +350,9 @@ where
338350
insert_anchor_or_seen_at_from_status(&mut update, start_time, tx.txid, tx.status);
339351
insert_prevouts(&mut update, tx.vin);
340352
}
353+
update
354+
.evicted_ats
355+
.extend(evicted.into_iter().map(|txid| (txid, start_time)));
341356
}
342357

343358
let last_index = last_index.expect("Must be set since handles wasn't empty.");
@@ -370,7 +385,7 @@ async fn fetch_txs_with_spks<I, S>(
370385
parallel_requests: usize,
371386
) -> Result<TxUpdate<ConfirmationBlockTime>, Error>
372387
where
373-
I: IntoIterator<Item = ScriptBuf> + Send,
388+
I: IntoIterator<Item = SpkWithExpectedTxids> + Send,
374389
I::IntoIter: Send,
375390
S: Sleeper + Clone + Send + Sync,
376391
{

crates/esplora/src/blocking_ext.rs

Lines changed: 35 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
use bdk_core::collections::{BTreeMap, BTreeSet, HashSet};
2-
use bdk_core::spk_client::{FullScanRequest, FullScanResponse, SyncRequest, SyncResponse};
2+
use bdk_core::spk_client::{
3+
FullScanRequest, FullScanResponse, SpkWithExpectedTxids, SyncRequest, SyncResponse,
4+
};
35
use bdk_core::{
4-
bitcoin::{BlockHash, OutPoint, ScriptBuf, Txid},
6+
bitcoin::{BlockHash, OutPoint, Txid},
57
BlockId, CheckPoint, ConfirmationBlockTime, Indexed, TxUpdate,
68
};
79
use esplora_client::{OutputStatus, Tx};
@@ -53,7 +55,7 @@ impl EsploraExt for esplora_client::BlockingClient {
5355
stop_gap: usize,
5456
parallel_requests: usize,
5557
) -> Result<FullScanResponse<K>, Error> {
56-
let mut request = request.into();
58+
let mut request: FullScanRequest<K> = request.into();
5759
let start_time = request.start_time();
5860

5961
let chain_tip = request.chain_tip();
@@ -67,7 +69,9 @@ impl EsploraExt for esplora_client::BlockingClient {
6769
let mut inserted_txs = HashSet::<Txid>::new();
6870
let mut last_active_indices = BTreeMap::<K, u32>::new();
6971
for keychain in request.keychains() {
70-
let keychain_spks = request.iter_spks(keychain.clone());
72+
let keychain_spks = request
73+
.iter_spks(keychain.clone())
74+
.map(|(spk_i, spk)| (spk_i, spk.into()));
7175
let (update, last_active_index) = fetch_txs_with_keychain_spks(
7276
self,
7377
start_time,
@@ -120,7 +124,7 @@ impl EsploraExt for esplora_client::BlockingClient {
120124
self,
121125
start_time,
122126
&mut inserted_txs,
123-
request.iter_spks(),
127+
request.iter_spks_with_expected_txids(),
124128
parallel_requests,
125129
)?);
126130
tx_update.extend(fetch_txs_with_txids(
@@ -254,15 +258,15 @@ fn chain_update(
254258
Ok(tip)
255259
}
256260

257-
fn fetch_txs_with_keychain_spks<I: Iterator<Item = Indexed<ScriptBuf>>>(
261+
fn fetch_txs_with_keychain_spks<I: Iterator<Item = Indexed<SpkWithExpectedTxids>>>(
258262
client: &esplora_client::BlockingClient,
259263
start_time: u64,
260264
inserted_txs: &mut HashSet<Txid>,
261265
mut keychain_spks: I,
262266
stop_gap: usize,
263267
parallel_requests: usize,
264268
) -> Result<(TxUpdate<ConfirmationBlockTime>, Option<u32>), Error> {
265-
type TxsOfSpkIndex = (u32, Vec<esplora_client::Tx>);
269+
type TxsOfSpkIndex = (u32, Vec<esplora_client::Tx>, HashSet<Txid>);
266270

267271
let mut update = TxUpdate::<ConfirmationBlockTime>::default();
268272
let mut last_index = Option::<u32>::None;
@@ -273,21 +277,27 @@ fn fetch_txs_with_keychain_spks<I: Iterator<Item = Indexed<ScriptBuf>>>(
273277
.by_ref()
274278
.take(parallel_requests)
275279
.map(|(spk_index, spk)| {
276-
std::thread::spawn({
277-
let client = client.clone();
278-
move || -> Result<TxsOfSpkIndex, Error> {
279-
let mut last_seen = None;
280-
let mut spk_txs = Vec::new();
281-
loop {
282-
let txs = client.scripthash_txs(&spk, last_seen)?;
283-
let tx_count = txs.len();
284-
last_seen = txs.last().map(|tx| tx.txid);
285-
spk_txs.extend(txs);
286-
if tx_count < 25 {
287-
break Ok((spk_index, spk_txs));
288-
}
280+
let client = client.clone();
281+
let expected_txids = spk.expected_txids;
282+
let spk = spk.spk;
283+
std::thread::spawn(move || -> Result<TxsOfSpkIndex, Error> {
284+
let mut last_txid = None;
285+
let mut spk_txs = Vec::new();
286+
loop {
287+
let txs = client.scripthash_txs(&spk, last_txid)?;
288+
let tx_count = txs.len();
289+
last_txid = txs.last().map(|tx| tx.txid);
290+
spk_txs.extend(txs);
291+
if tx_count < 25 {
292+
break;
289293
}
290294
}
295+
let got_txids = spk_txs.iter().map(|tx| tx.txid).collect::<HashSet<_>>();
296+
let evicted_txids = expected_txids
297+
.difference(&got_txids)
298+
.copied()
299+
.collect::<HashSet<_>>();
300+
Ok((spk_index, spk_txs, evicted_txids))
291301
})
292302
})
293303
.collect::<Vec<JoinHandle<Result<TxsOfSpkIndex, Error>>>>();
@@ -297,7 +307,7 @@ fn fetch_txs_with_keychain_spks<I: Iterator<Item = Indexed<ScriptBuf>>>(
297307
}
298308

299309
for handle in handles {
300-
let (index, txs) = handle.join().expect("thread must not panic")?;
310+
let (index, txs, evicted) = handle.join().expect("thread must not panic")?;
301311
last_index = Some(index);
302312
if !txs.is_empty() {
303313
last_active_index = Some(index);
@@ -309,6 +319,9 @@ fn fetch_txs_with_keychain_spks<I: Iterator<Item = Indexed<ScriptBuf>>>(
309319
insert_anchor_or_seen_at_from_status(&mut update, start_time, tx.txid, tx.status);
310320
insert_prevouts(&mut update, tx.vin);
311321
}
322+
update
323+
.evicted_ats
324+
.extend(evicted.into_iter().map(|txid| (txid, start_time)));
312325
}
313326

314327
let last_index = last_index.expect("Must be set since handles wasn't empty.");
@@ -333,7 +346,7 @@ fn fetch_txs_with_keychain_spks<I: Iterator<Item = Indexed<ScriptBuf>>>(
333346
/// requests to make in parallel.
334347
///
335348
/// Refer to [crate-level docs](crate) for more.
336-
fn fetch_txs_with_spks<I: IntoIterator<Item = ScriptBuf>>(
349+
fn fetch_txs_with_spks<I: IntoIterator<Item = SpkWithExpectedTxids>>(
337350
client: &esplora_client::BlockingClient,
338351
start_time: u64,
339352
inserted_txs: &mut HashSet<Txid>,

crates/esplora/tests/async_ext.rs

Lines changed: 126 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,139 @@
1+
use bdk_chain::keychain_txout::KeychainTxOutIndex;
2+
use bdk_chain::local_chain::LocalChain;
13
use bdk_chain::spk_client::{FullScanRequest, SyncRequest};
2-
use bdk_chain::{ConfirmationBlockTime, TxGraph};
4+
use bdk_chain::{ConfirmationBlockTime, IndexedTxGraph, TxGraph};
5+
use bdk_core::bitcoin::key::Secp256k1;
36
use bdk_esplora::EsploraAsyncExt;
7+
use bdk_testenv::bitcoincore_rpc::json::CreateRawTransactionInput;
8+
use bdk_testenv::bitcoincore_rpc::RawTx;
49
use esplora_client::{self, Builder};
5-
use std::collections::{BTreeSet, HashSet};
10+
use miniscript::Descriptor;
11+
use std::collections::{BTreeSet, HashMap, HashSet};
612
use std::str::FromStr;
713
use std::thread::sleep;
814
use std::time::Duration;
915

1016
use bdk_chain::bitcoin::{Address, Amount};
1117
use bdk_testenv::{anyhow, bitcoincore_rpc::RpcApi, TestEnv};
1218

19+
// Ensure that a wallet can detect a malicious replacement of an incoming transaction.
20+
//
21+
// This checks that both the Electrum chain source and the receiving structures properly track the
22+
// replaced transaction as missing.
23+
#[tokio::test]
24+
pub async fn detect_receive_tx_cancel() -> anyhow::Result<()> {
25+
const SEND_TX_FEE: Amount = Amount::from_sat(1000);
26+
const UNDO_SEND_TX_FEE: Amount = Amount::from_sat(2000);
27+
28+
use bdk_chain::keychain_txout::SyncRequestBuilderExt;
29+
let env = TestEnv::new()?;
30+
let rpc_client = env.rpc_client();
31+
let base_url = format!("http://{}", &env.electrsd.esplora_url.clone().unwrap());
32+
let client = Builder::new(base_url.as_str()).build_async()?;
33+
34+
let (receiver_desc, _) = Descriptor::parse_descriptor(&Secp256k1::signing_only(), "tr([73c5da0a/86'/0'/0']xprv9xgqHN7yz9MwCkxsBPN5qetuNdQSUttZNKw1dcYTV4mkaAFiBVGQziHs3NRSWMkCzvgjEe3n9xV8oYywvM8at9yRqyaZVz6TYYhX98VjsUk/0/*)")
35+
.expect("must be valid");
36+
let mut graph = IndexedTxGraph::<ConfirmationBlockTime, _>::new(KeychainTxOutIndex::new(0));
37+
let _ = graph.index.insert_descriptor((), receiver_desc.clone())?;
38+
let (chain, _) = LocalChain::from_genesis_hash(env.bitcoind.client.get_block_hash(0)?);
39+
40+
// Derive the receiving address from the descriptor.
41+
let ((_, receiver_spk), _) = graph.index.reveal_next_spk(()).unwrap();
42+
let receiver_addr = Address::from_script(&receiver_spk, bdk_chain::bitcoin::Network::Regtest)?;
43+
44+
env.mine_blocks(101, None)?;
45+
46+
// Select a UTXO to use as an input for constructing our test transactions.
47+
let selected_utxo = rpc_client
48+
.list_unspent(None, None, None, Some(false), None)?
49+
.into_iter()
50+
// Find a block reward tx.
51+
.find(|utxo| utxo.amount == Amount::from_int_btc(50))
52+
.expect("Must find a block reward UTXO");
53+
54+
// Derive the sender's address from the selected UTXO.
55+
let sender_spk = selected_utxo.script_pub_key.clone();
56+
let sender_addr = Address::from_script(&sender_spk, bdk_chain::bitcoin::Network::Regtest)
57+
.expect("Failed to derive address from UTXO");
58+
59+
// Setup the common inputs used by both `send_tx` and `undo_send_tx`.
60+
let inputs = [CreateRawTransactionInput {
61+
txid: selected_utxo.txid,
62+
vout: selected_utxo.vout,
63+
sequence: None,
64+
}];
65+
66+
// Create and sign the `send_tx` that sends funds to the receiver address.
67+
let send_tx_outputs = HashMap::from([(
68+
receiver_addr.to_string(),
69+
selected_utxo.amount - SEND_TX_FEE,
70+
)]);
71+
let send_tx = rpc_client.create_raw_transaction(&inputs, &send_tx_outputs, None, Some(true))?;
72+
let send_tx = rpc_client
73+
.sign_raw_transaction_with_wallet(send_tx.raw_hex(), None, None)?
74+
.transaction()?;
75+
76+
// Create and sign the `undo_send_tx` transaction. This redirects funds back to the sender
77+
// address.
78+
let undo_send_outputs = HashMap::from([(
79+
sender_addr.to_string(),
80+
selected_utxo.amount - UNDO_SEND_TX_FEE,
81+
)]);
82+
let undo_send_tx =
83+
rpc_client.create_raw_transaction(&inputs, &undo_send_outputs, None, Some(true))?;
84+
let undo_send_tx = rpc_client
85+
.sign_raw_transaction_with_wallet(undo_send_tx.raw_hex(), None, None)?
86+
.transaction()?;
87+
88+
// Sync after broadcasting the `send_tx`. Ensure that we detect and receive the `send_tx`.
89+
let send_txid = env.rpc_client().send_raw_transaction(send_tx.raw_hex())?;
90+
env.wait_until_electrum_sees_txid(send_txid, Duration::from_secs(6))?;
91+
let sync_request = SyncRequest::builder()
92+
.chain_tip(chain.tip())
93+
.revealed_spks_from_indexer(&graph.index, ..)
94+
.expected_spk_txids(graph.list_expected_spk_txids(&chain, chain.tip().block_id(), ..));
95+
let sync_response = client.sync(sync_request, 1).await?;
96+
assert!(
97+
sync_response
98+
.tx_update
99+
.txs
100+
.iter()
101+
.any(|tx| tx.compute_txid() == send_txid),
102+
"sync response must include the send_tx"
103+
);
104+
let changeset = graph.apply_update(sync_response.tx_update.clone());
105+
assert!(
106+
changeset.tx_graph.txs.contains(&send_tx),
107+
"tx graph must deem send_tx relevant and include it"
108+
);
109+
110+
// Sync after broadcasting the `undo_send_tx`. Verify that `send_tx` is now missing from the
111+
// mempool.
112+
let undo_send_txid = env
113+
.rpc_client()
114+
.send_raw_transaction(undo_send_tx.raw_hex())?;
115+
env.wait_until_electrum_sees_txid(undo_send_txid, Duration::from_secs(6))?;
116+
let sync_request = SyncRequest::builder()
117+
.chain_tip(chain.tip())
118+
.revealed_spks_from_indexer(&graph.index, ..)
119+
.expected_spk_txids(graph.list_expected_spk_txids(&chain, chain.tip().block_id(), ..));
120+
let sync_response = client.sync(sync_request, 1).await?;
121+
assert!(
122+
sync_response
123+
.tx_update
124+
.evicted_ats
125+
.iter()
126+
.any(|(txid, _)| *txid == send_txid),
127+
"sync response must track send_tx as missing from mempool"
128+
);
129+
let changeset = graph.apply_update(sync_response.tx_update.clone());
130+
assert!(
131+
changeset.tx_graph.last_evicted.contains_key(&send_txid),
132+
"tx graph must track send_tx as missing"
133+
);
134+
135+
Ok(())
136+
}
13137
#[tokio::test]
14138
pub async fn test_update_tx_graph_without_keychain() -> anyhow::Result<()> {
15139
let env = TestEnv::new()?;

0 commit comments

Comments
 (0)