diff --git a/.github/workflows/cont_integration.yml b/.github/workflows/cont_integration.yml index c287b88c6..cd45ff29e 100644 --- a/.github/workflows/cont_integration.yml +++ b/.github/workflows/cont_integration.yml @@ -96,7 +96,7 @@ jobs: - name: Check esplora working-directory: ./crates/esplora # TODO "--target thumbv6m-none-eabi" should work but currently does not - run: cargo check --no-default-features --features miniscript/no-std,bdk_chain/hashbrown + run: cargo check --no-default-features --features bdk_chain/hashbrown check-wasm: needs: prepare @@ -128,7 +128,7 @@ jobs: run: cargo check --target wasm32-unknown-unknown --no-default-features --features miniscript/no-std,bdk_chain/hashbrown - name: Check esplora working-directory: ./crates/esplora - run: cargo check --target wasm32-unknown-unknown --no-default-features --features miniscript/no-std,bdk_chain/hashbrown,async + run: cargo check --target wasm32-unknown-unknown --no-default-features --features bdk_core/hashbrown,async fmt: needs: prepare diff --git a/crates/chain/src/indexed_tx_graph.rs b/crates/chain/src/indexed_tx_graph.rs index 45ed92aee..bcd6ac3fc 100644 --- a/crates/chain/src/indexed_tx_graph.rs +++ b/crates/chain/src/indexed_tx_graph.rs @@ -1,13 +1,18 @@ //! Contains the [`IndexedTxGraph`] and associated types. Refer to the //! [`IndexedTxGraph`] documentation for more. -use core::fmt::Debug; +use core::{ + convert::Infallible, + fmt::{self, Debug}, + ops::RangeBounds, +}; use alloc::{sync::Arc, vec::Vec}; -use bitcoin::{Block, OutPoint, Transaction, TxOut, Txid}; +use bitcoin::{Block, OutPoint, ScriptBuf, Transaction, TxOut, Txid}; use crate::{ + spk_txout::SpkTxOutIndex, tx_graph::{self, TxGraph}, - Anchor, BlockId, Indexer, Merge, TxPosInBlock, + Anchor, BlockId, ChainOracle, Indexer, Merge, TxPosInBlock, }; /// The [`IndexedTxGraph`] combines a [`TxGraph`] and an [`Indexer`] implementation. @@ -127,6 +132,19 @@ where self.graph.insert_seen_at(txid, seen_at).into() } + /// Inserts the given `evicted_at` for `txid`. + /// + /// The `evicted_at` timestamp represents the last known time when the transaction was observed + /// to be missing from the mempool. If `txid` was previously recorded with an earlier + /// `evicted_at` value, it is updated only if the new value is greater. + pub fn insert_evicted_at(&mut self, txid: Txid, evicted_at: u64) -> ChangeSet { + let tx_graph = self.graph.insert_evicted_at(txid, evicted_at); + ChangeSet { + tx_graph, + ..Default::default() + } + } + /// Batch insert transactions, filtering out those that are irrelevant. /// /// Relevancy is determined by the [`Indexer::is_tx_relevant`] implementation of `I`. Irrelevant @@ -301,6 +319,58 @@ where } } +impl IndexedTxGraph +where + A: Anchor, +{ + /// List txids that are expected to exist under the given spks. + /// + /// This is used to fill [`SyncRequestBuilder::expected_spk_txids`](bdk_core::spk_client::SyncRequestBuilder::expected_spk_txids). + /// + /// The spk index range can be contrained with `range`. + /// + /// # Error + /// + /// If the [`ChainOracle`] implementation (`chain`) fails, an error will be returned with the + /// returned item. + /// + /// If the [`ChainOracle`] is infallible, + /// [`list_expected_spk_txids`](Self::list_expected_spk_txids) can be used instead. + pub fn try_list_expected_spk_txids<'a, C, I>( + &'a self, + chain: &'a C, + chain_tip: BlockId, + spk_index_range: impl RangeBounds + 'a, + ) -> impl Iterator> + 'a + where + C: ChainOracle, + X: AsRef> + 'a, + I: fmt::Debug + Clone + Ord + 'a, + { + self.graph + .try_list_expected_spk_txids(chain, chain_tip, &self.index, spk_index_range) + } + + /// List txids that are expected to exist under the given spks. + /// + /// This is the infallible version of + /// [`try_list_expected_spk_txids`](Self::try_list_expected_spk_txids). + pub fn list_expected_spk_txids<'a, C, I>( + &'a self, + chain: &'a C, + chain_tip: BlockId, + spk_index_range: impl RangeBounds + 'a, + ) -> impl Iterator + 'a + where + C: ChainOracle, + X: AsRef> + 'a, + I: fmt::Debug + Clone + Ord + 'a, + { + self.try_list_expected_spk_txids(chain, chain_tip, spk_index_range) + .map(|r| r.expect("infallible")) + } +} + impl AsRef> for IndexedTxGraph { fn as_ref(&self) -> &TxGraph { &self.graph diff --git a/crates/chain/src/indexer/keychain_txout.rs b/crates/chain/src/indexer/keychain_txout.rs index 4543027cc..bdea4b82d 100644 --- a/crates/chain/src/indexer/keychain_txout.rs +++ b/crates/chain/src/indexer/keychain_txout.rs @@ -136,6 +136,12 @@ impl Default for KeychainTxOutIndex { } } +impl AsRef> for KeychainTxOutIndex { + fn as_ref(&self) -> &SpkTxOutIndex<(K, u32)> { + &self.inner + } +} + impl Indexer for KeychainTxOutIndex { type ChangeSet = ChangeSet; @@ -200,6 +206,11 @@ impl KeychainTxOutIndex { lookahead, } } + + /// Get a reference to the internal [`SpkTxOutIndex`]. + pub fn inner(&self) -> &SpkTxOutIndex<(K, u32)> { + &self.inner + } } /// Methods that are *re-exposed* from the internal [`SpkTxOutIndex`]. diff --git a/crates/chain/src/indexer/spk_txout.rs b/crates/chain/src/indexer/spk_txout.rs index 286e5d2dc..6378dbb79 100644 --- a/crates/chain/src/indexer/spk_txout.rs +++ b/crates/chain/src/indexer/spk_txout.rs @@ -54,6 +54,12 @@ impl Default for SpkTxOutIndex { } } +impl AsRef> for SpkTxOutIndex { + fn as_ref(&self) -> &SpkTxOutIndex { + self + } +} + impl Indexer for SpkTxOutIndex { type ChangeSet = (); @@ -334,4 +340,24 @@ impl SpkTxOutIndex { .any(|output| self.spk_indices.contains_key(&output.script_pubkey)); input_matches || output_matches } + + /// Find relevant script pubkeys associated with a transaction for tracking and validation. + /// + /// Returns a set of script pubkeys from [`SpkTxOutIndex`] that are relevant to the outputs and + /// previous outputs of a given transaction. Inputs are only considered relevant if the parent + /// transactions have been scanned. + pub fn relevant_spks_of_tx(&self, tx: &Transaction) -> BTreeSet<(I, ScriptBuf)> { + let spks_from_inputs = tx.input.iter().filter_map(|txin| { + self.txouts + .get(&txin.previous_output) + .cloned() + .map(|(i, prev_txo)| (i, prev_txo.script_pubkey)) + }); + let spks_from_outputs = tx + .output + .iter() + .filter_map(|txout| self.spk_indices.get_key_value(&txout.script_pubkey)) + .map(|(spk, i)| (i.clone(), spk.clone())); + spks_from_inputs.chain(spks_from_outputs).collect() + } } diff --git a/crates/chain/src/rusqlite_impl.rs b/crates/chain/src/rusqlite_impl.rs index 7b39f53c0..3bc105d0b 100644 --- a/crates/chain/src/rusqlite_impl.rs +++ b/crates/chain/src/rusqlite_impl.rs @@ -264,12 +264,20 @@ impl tx_graph::ChangeSet { format!("{add_confirmation_time_column}; {extract_confirmation_time_from_anchor_column}; {drop_anchor_column}") } + /// Get v2 of sqlite [tx_graph::ChangeSet] schema + pub fn schema_v2() -> String { + format!( + "ALTER TABLE {} ADD COLUMN last_evicted INTEGER", + Self::TXS_TABLE_NAME, + ) + } + /// Initialize sqlite tables. pub fn init_sqlite_tables(db_tx: &rusqlite::Transaction) -> rusqlite::Result<()> { migrate_schema( db_tx, Self::SCHEMA_NAME, - &[&Self::schema_v0(), &Self::schema_v1()], + &[&Self::schema_v0(), &Self::schema_v1(), &Self::schema_v2()], ) } @@ -280,7 +288,7 @@ impl tx_graph::ChangeSet { let mut changeset = Self::default(); let mut statement = db_tx.prepare(&format!( - "SELECT txid, raw_tx, last_seen FROM {}", + "SELECT txid, raw_tx, last_seen, last_evicted FROM {}", Self::TXS_TABLE_NAME, ))?; let row_iter = statement.query_map([], |row| { @@ -288,16 +296,20 @@ impl tx_graph::ChangeSet { row.get::<_, Impl>("txid")?, row.get::<_, Option>>("raw_tx")?, row.get::<_, Option>("last_seen")?, + row.get::<_, Option>("last_evicted")?, )) })?; for row in row_iter { - let (Impl(txid), tx, last_seen) = row?; + let (Impl(txid), tx, last_seen, last_evicted) = row?; if let Some(Impl(tx)) = tx { changeset.txs.insert(Arc::new(tx)); } if let Some(last_seen) = last_seen { changeset.last_seen.insert(txid, last_seen); } + if let Some(last_evicted) = last_evicted { + changeset.last_evicted.insert(txid, last_evicted); + } } let mut statement = db_tx.prepare(&format!( @@ -377,6 +389,19 @@ impl tx_graph::ChangeSet { })?; } + let mut statement = db_tx + .prepare_cached(&format!( + "INSERT INTO {}(txid, last_evicted) VALUES(:txid, :last_evicted) ON CONFLICT(txid) DO UPDATE SET last_evicted=:last_evicted", + Self::TXS_TABLE_NAME, + ))?; + for (&txid, &last_evicted) in &self.last_evicted { + let checked_time = last_evicted.to_sql()?; + statement.execute(named_params! { + ":txid": Impl(txid), + ":last_evicted": Some(checked_time), + })?; + } + let mut statement = db_tx.prepare_cached(&format!( "REPLACE INTO {}(txid, vout, value, script) VALUES(:txid, :vout, :value, :script)", Self::TXOUTS_TABLE_NAME, @@ -628,7 +653,7 @@ mod test { } #[test] - fn v0_to_v1_schema_migration_is_backward_compatible() -> anyhow::Result<()> { + fn v0_to_v2_schema_migration_is_backward_compatible() -> anyhow::Result<()> { type ChangeSet = tx_graph::ChangeSet; let mut conn = rusqlite::Connection::open_in_memory()?; @@ -697,13 +722,17 @@ mod test { } } - // Apply v1 sqlite schema to tables with data + // Apply v1 & v2 sqlite schema to tables with data { let db_tx = conn.transaction()?; migrate_schema( &db_tx, ChangeSet::SCHEMA_NAME, - &[&ChangeSet::schema_v0(), &ChangeSet::schema_v1()], + &[ + &ChangeSet::schema_v0(), + &ChangeSet::schema_v1(), + &ChangeSet::schema_v2(), + ], )?; db_tx.commit()?; } @@ -718,4 +747,43 @@ mod test { Ok(()) } + + #[test] + fn can_persist_last_evicted() -> anyhow::Result<()> { + use bitcoin::hashes::Hash; + + type ChangeSet = tx_graph::ChangeSet; + let mut conn = rusqlite::Connection::open_in_memory()?; + + // Init tables + { + let db_tx = conn.transaction()?; + ChangeSet::init_sqlite_tables(&db_tx)?; + db_tx.commit()?; + } + + let txid = bitcoin::Txid::all_zeros(); + let last_evicted = 100; + + // Persist `last_evicted` + { + let changeset = ChangeSet { + last_evicted: [(txid, last_evicted)].into(), + ..Default::default() + }; + let db_tx = conn.transaction()?; + changeset.persist_to_sqlite(&db_tx)?; + db_tx.commit()?; + } + + // Load from sqlite should succeed + { + let db_tx = conn.transaction()?; + let changeset = ChangeSet::from_sqlite(&db_tx)?; + db_tx.commit()?; + assert_eq!(changeset.last_evicted.get(&txid), Some(&last_evicted)); + } + + Ok(()) + } } diff --git a/crates/chain/src/tx_graph.rs b/crates/chain/src/tx_graph.rs index d40ee49d3..4c2990351 100644 --- a/crates/chain/src/tx_graph.rs +++ b/crates/chain/src/tx_graph.rs @@ -16,23 +16,52 @@ //! documentation for more details), and the timestamp of the last time we saw the transaction as //! unconfirmed. //! -//! Conflicting transactions are allowed to coexist within a [`TxGraph`]. This is useful for -//! identifying and traversing conflicts and descendants of a given transaction. Some [`TxGraph`] -//! methods only consider transactions that are "canonical" (i.e., in the best chain or in mempool). -//! We decide which transactions are canonical based on the transaction's anchors and the -//! `last_seen` (as unconfirmed) timestamp. +//! # Canonicalization //! -//! The [`ChangeSet`] reports changes made to a [`TxGraph`]; it can be used to either save to -//! persistent storage, or to be applied to another [`TxGraph`]. +//! Conflicting transactions are allowed to coexist within a [`TxGraph`]. A process called +//! canonicalization is required to get a conflict-free view of transactions. +//! +//! * [`list_canonical_txs`](TxGraph::list_canonical_txs) lists canonical transactions. +//! * [`filter_chain_txouts`](TxGraph::filter_chain_txouts) filters out canonical outputs from a +//! list of outpoints. +//! * [`filter_chain_unspents`](TxGraph::filter_chain_unspents) filters out canonical unspent +//! outputs from a list of outpoints. +//! * [`balance`](TxGraph::balance) gets the total sum of unspent outputs filtered from a list of +//! outpoints. +//! * [`canonical_iter`](TxGraph::canonical_iter) returns the [`CanonicalIter`] which contains all +//! of the canonicalization logic. +//! +//! All these methods require a `chain` and `chain_tip` argument. The `chain` must be a +//! [`ChainOracle`] implementation (such as [`LocalChain`](crate::local_chain::LocalChain)) which +//! identifies which blocks exist under a given `chain_tip`. //! -//! Lastly, you can use [`TxAncestors`]/[`TxDescendants`] to traverse ancestors and descendants of -//! a given transaction, respectively. +//! The canonicalization algorithm uses the following associated data to determine which +//! transactions have precedence over others: +//! +//! * [`Anchor`] - This bit of data represents that a transaction is anchored in a given block. If +//! the transaction is anchored in chain of `chain_tip`, or is an ancestor of a transaction +//! anchored in chain of `chain_tip`, then the transaction must be canonical. +//! * `last_seen` - This is the timestamp of when a transaction is last-seen in the mempool. This +//! value is updated by [`insert_seen_at`](TxGraph::insert_seen_at) and +//! [`apply_update`](TxGraph::apply_update). Transactions that are seen later have higher +//! priority than those that are seen earlier. `last_seen` values are transitive. This means +//! that the actual `last_seen` value of a transaction is the max of all the `last_seen` values +//! from it's descendants. +//! * `last_evicted` - This is the timestamp of when a transaction last went missing from the +//! mempool. If this value is equal to or higher than the transaction's `last_seen` value, then +//! it will not be considered canonical. +//! +//! # Graph traversal +//! +//! You can use [`TxAncestors`]/[`TxDescendants`] to traverse ancestors and descendants of a given +//! transaction, respectively. //! //! # Applying changes //! +//! The [`ChangeSet`] reports changes made to a [`TxGraph`]; it can be used to either save to +//! persistent storage, or to be applied to another [`TxGraph`]. +//! //! Methods that change the state of [`TxGraph`] will return [`ChangeSet`]s. -//! [`ChangeSet`]s can be applied back to a [`TxGraph`] or be used to inform persistent storage -//! of the changes to [`TxGraph`]. //! //! # Generics //! @@ -91,6 +120,7 @@ //! [`insert_txout`]: TxGraph::insert_txout use crate::collections::*; +use crate::spk_txout::SpkTxOutIndex; use crate::BlockId; use crate::CanonicalIter; use crate::CanonicalReason; @@ -103,6 +133,7 @@ use bdk_core::ConfirmationBlockTime; pub use bdk_core::TxUpdate; use bitcoin::{Amount, OutPoint, ScriptBuf, SignedAmount, Transaction, TxOut, Txid}; use core::fmt::{self, Formatter}; +use core::ops::RangeBounds; use core::{ convert::Infallible, ops::{Deref, RangeInclusive}, @@ -122,6 +153,7 @@ impl From> for TxUpdate { .flat_map(|(txid, anchors)| anchors.into_iter().map(move |a| (a, txid))) .collect(); tx_update.seen_ats = graph.last_seen.into_iter().collect(); + tx_update.evicted_ats = graph.last_evicted.into_iter().collect(); tx_update } } @@ -145,6 +177,7 @@ pub struct TxGraph { spends: BTreeMap>, anchors: HashMap>, last_seen: HashMap, + last_evicted: HashMap, txs_by_highest_conf_heights: BTreeSet<(u32, Txid)>, txs_by_last_seen: BTreeSet<(u64, Txid)>, @@ -162,6 +195,7 @@ impl Default for TxGraph { spends: Default::default(), anchors: Default::default(), last_seen: Default::default(), + last_evicted: Default::default(), txs_by_highest_conf_heights: Default::default(), txs_by_last_seen: Default::default(), empty_outspends: Default::default(), @@ -715,6 +749,34 @@ impl TxGraph { changeset } + /// Inserts the given `evicted_at` for `txid` into [`TxGraph`]. + /// + /// The `evicted_at` timestamp represents the last known time when the transaction was observed + /// to be missing from the mempool. If `txid` was previously recorded with an earlier + /// `evicted_at` value, it is updated only if the new value is greater. + pub fn insert_evicted_at(&mut self, txid: Txid, evicted_at: u64) -> ChangeSet { + let is_changed = match self.last_evicted.entry(txid) { + hash_map::Entry::Occupied(mut e) => { + let last_evicted = e.get_mut(); + let change = *last_evicted < evicted_at; + if change { + *last_evicted = evicted_at; + } + change + } + hash_map::Entry::Vacant(e) => { + e.insert(evicted_at); + true + } + }; + + let mut changeset = ChangeSet::::default(); + if is_changed { + changeset.last_evicted.insert(txid, evicted_at); + } + changeset + } + /// Extends this graph with the given `update`. /// /// The returned [`ChangeSet`] is the set difference between `update` and `self` (transactions that @@ -733,6 +795,9 @@ impl TxGraph { for (txid, seen_at) in update.seen_ats { changeset.merge(self.insert_seen_at(txid, seen_at)); } + for (txid, evicted_at) in update.evicted_ats { + changeset.merge(self.insert_evicted_at(txid, evicted_at)); + } changeset } @@ -750,6 +815,7 @@ impl TxGraph { .flat_map(|(txid, anchors)| anchors.iter().map(|a| (a.clone(), *txid))) .collect(), last_seen: self.last_seen.iter().map(|(&k, &v)| (k, v)).collect(), + last_evicted: self.last_evicted.iter().map(|(&k, &v)| (k, v)).collect(), } } @@ -767,6 +833,9 @@ impl TxGraph { for (txid, seen_at) in changeset.last_seen { let _ = self.insert_seen_at(txid, seen_at); } + for (txid, evicted_at) in changeset.last_evicted { + let _ = self.insert_evicted_at(txid, evicted_at); + } } } @@ -937,9 +1006,17 @@ impl TxGraph { /// List txids by descending last-seen order. /// - /// Transactions without last-seens are excluded. - pub fn txids_by_descending_last_seen(&self) -> impl ExactSizeIterator + '_ { - self.txs_by_last_seen.iter().copied().rev() + /// Transactions without last-seens are excluded. Transactions with a last-evicted timestamp + /// equal or higher than it's last-seen timestamp are excluded. + pub fn txids_by_descending_last_seen(&self) -> impl Iterator + '_ { + self.txs_by_last_seen + .iter() + .copied() + .rev() + .filter(|(last_seen, txid)| match self.last_evicted.get(txid) { + Some(last_evicted) => last_evicted < last_seen, + None => true, + }) } /// Returns a [`CanonicalIter`]. @@ -1078,6 +1155,67 @@ impl TxGraph { self.try_balance(chain, chain_tip, outpoints, trust_predicate) .expect("oracle is infallible") } + + /// List txids that are expected to exist under the given spks. + /// + /// This is used to fill [`SyncRequestBuilder::expected_spk_txids`](bdk_core::spk_client::SyncRequestBuilder::expected_spk_txids). + /// + /// The spk index range can be constrained with `range`. + /// + /// # Error + /// + /// If the [`ChainOracle`] implementation (`chain`) fails, an error will be returned with the + /// returned item. + /// + /// If the [`ChainOracle`] is infallible, + /// [`list_expected_spk_txids`](Self::list_expected_spk_txids) can be used instead. + pub fn try_list_expected_spk_txids<'a, C, I>( + &'a self, + chain: &'a C, + chain_tip: BlockId, + indexer: &'a impl AsRef>, + spk_index_range: impl RangeBounds + 'a, + ) -> impl Iterator> + 'a + where + C: ChainOracle, + I: fmt::Debug + Clone + Ord + 'a, + { + let indexer = indexer.as_ref(); + self.try_list_canonical_txs(chain, chain_tip).flat_map( + move |res| -> Vec> { + let range = &spk_index_range; + let c_tx = match res { + Ok(c_tx) => c_tx, + Err(err) => return vec![Err(err)], + }; + let relevant_spks = indexer.relevant_spks_of_tx(&c_tx.tx_node); + relevant_spks + .into_iter() + .filter(|(i, _)| range.contains(i)) + .map(|(_, spk)| Ok((spk, c_tx.tx_node.txid))) + .collect() + }, + ) + } + + /// List txids that are expected to exist under the given spks. + /// + /// This is the infallible version of + /// [`try_list_expected_spk_txids`](Self::try_list_expected_spk_txids). + pub fn list_expected_spk_txids<'a, C, I>( + &'a self, + chain: &'a C, + chain_tip: BlockId, + indexer: &'a impl AsRef>, + spk_index_range: impl RangeBounds + 'a, + ) -> impl Iterator + 'a + where + C: ChainOracle, + I: fmt::Debug + Clone + Ord + 'a, + { + self.try_list_expected_spk_txids(chain, chain_tip, indexer, spk_index_range) + .map(|r| r.expect("infallible")) + } } /// The [`ChangeSet`] represents changes to a [`TxGraph`]. @@ -1107,6 +1245,9 @@ pub struct ChangeSet { pub anchors: BTreeSet<(A, Txid)>, /// Added last-seen unix timestamps of transactions. pub last_seen: BTreeMap, + /// Added timestamps of when a transaction is last evicted from the mempool. + #[cfg_attr(feature = "serde", serde(default))] + pub last_evicted: BTreeMap, } impl Default for ChangeSet { @@ -1116,6 +1257,7 @@ impl Default for ChangeSet { txouts: Default::default(), anchors: Default::default(), last_seen: Default::default(), + last_evicted: Default::default(), } } } @@ -1170,6 +1312,14 @@ impl Merge for ChangeSet { .filter(|(txid, update_ls)| self.last_seen.get(txid) < Some(update_ls)) .collect::>(), ); + // last_evicted timestamps should only increase + self.last_evicted.extend( + other + .last_evicted + .into_iter() + .filter(|(txid, update_lm)| self.last_evicted.get(txid) < Some(update_lm)) + .collect::>(), + ); } fn is_empty(&self) -> bool { @@ -1177,6 +1327,7 @@ impl Merge for ChangeSet { && self.txouts.is_empty() && self.anchors.is_empty() && self.last_seen.is_empty() + && self.last_evicted.is_empty() } } @@ -1196,6 +1347,7 @@ impl ChangeSet { self.anchors.into_iter().map(|(a, txid)| (f(a), txid)), ), last_seen: self.last_seen, + last_evicted: self.last_evicted, } } } diff --git a/crates/chain/tests/test_tx_graph.rs b/crates/chain/tests/test_tx_graph.rs index eef5e2239..446147821 100644 --- a/crates/chain/tests/test_tx_graph.rs +++ b/crates/chain/tests/test_tx_graph.rs @@ -115,7 +115,8 @@ fn insert_txouts() { txs: [Arc::new(update_tx.clone())].into(), txouts: update_ops.clone().into(), anchors: [(conf_anchor, update_tx.compute_txid()),].into(), - last_seen: [(hash!("tx2"), 1000000)].into() + last_seen: [(hash!("tx2"), 1000000)].into(), + last_evicted: [].into(), } ); @@ -168,7 +169,8 @@ fn insert_txouts() { txs: [Arc::new(update_tx.clone())].into(), txouts: update_ops.into_iter().chain(original_ops).collect(), anchors: [(conf_anchor, update_tx.compute_txid()),].into(), - last_seen: [(hash!("tx2"), 1000000)].into() + last_seen: [(hash!("tx2"), 1000000)].into(), + last_evicted: [].into(), } ); } diff --git a/crates/core/src/spk_client.rs b/crates/core/src/spk_client.rs index dce3b7ae1..b6a8e0204 100644 --- a/crates/core/src/spk_client.rs +++ b/crates/core/src/spk_client.rs @@ -1,7 +1,7 @@ //! Helper types for spk-based blockchain clients. use crate::{ alloc::{boxed::Box, collections::VecDeque, vec::Vec}, - collections::BTreeMap, + collections::{BTreeMap, HashMap, HashSet}, CheckPoint, ConfirmationBlockTime, Indexed, }; use bitcoin::{OutPoint, Script, ScriptBuf, Txid}; @@ -86,6 +86,28 @@ impl SyncProgress { } } +/// [`Script`] with expected [`Txid`] histories. +#[derive(Debug, Clone)] +pub struct SpkWithExpectedTxids { + /// Script pubkey. + pub spk: ScriptBuf, + + /// [`Txid`]s that we expect to appear in the chain source's spk history response. + /// + /// Any transaction listed here that is missing from the spk history response should be + /// considered evicted from the mempool. + pub expected_txids: HashSet, +} + +impl From for SpkWithExpectedTxids { + fn from(spk: ScriptBuf) -> Self { + Self { + spk, + expected_txids: HashSet::new(), + } + } +} + /// Builds a [`SyncRequest`]. /// /// Construct with [`SyncRequest::builder`]. @@ -153,6 +175,20 @@ impl SyncRequestBuilder { self } + /// Add transactions that are expected to exist under the given spks. + /// + /// This is useful for detecting a malicious replacement of an incoming transaction. + pub fn expected_spk_txids(mut self, txs: impl IntoIterator) -> Self { + for (spk, txid) in txs { + self.inner + .spk_expected_txids + .entry(spk) + .or_default() + .insert(txid); + } + self + } + /// Add [`Txid`]s that will be synced against. pub fn txids(mut self, txids: impl IntoIterator) -> Self { self.inner.txids.extend(txids); @@ -208,6 +244,7 @@ pub struct SyncRequest { chain_tip: Option, spks: VecDeque<(I, ScriptBuf)>, spks_consumed: usize, + spk_expected_txids: HashMap>, txids: VecDeque, txids_consumed: usize, outpoints: VecDeque, @@ -237,6 +274,7 @@ impl SyncRequest { chain_tip: None, spks: VecDeque::new(), spks_consumed: 0, + spk_expected_txids: HashMap::new(), txids: VecDeque::new(), txids_consumed: 0, outpoints: VecDeque::new(), @@ -282,14 +320,23 @@ impl SyncRequest { self.chain_tip.clone() } - /// Advances the sync request and returns the next [`ScriptBuf`]. + /// Advances the sync request and returns the next [`ScriptBuf`] with corresponding [`Txid`] + /// history. /// /// Returns [`None`] when there are no more scripts remaining in the request. - pub fn next_spk(&mut self) -> Option { - let (i, spk) = self.spks.pop_front()?; + pub fn next_spk_with_expected_txids(&mut self) -> Option { + let (i, next_spk) = self.spks.pop_front()?; self.spks_consumed += 1; - self._call_inspect(SyncItem::Spk(i, spk.as_script())); - Some(spk) + self._call_inspect(SyncItem::Spk(i, next_spk.as_script())); + let spk_history = self + .spk_expected_txids + .get(&next_spk) + .cloned() + .unwrap_or_default(); + Some(SpkWithExpectedTxids { + spk: next_spk, + expected_txids: spk_history, + }) } /// Advances the sync request and returns the next [`Txid`]. @@ -312,9 +359,11 @@ impl SyncRequest { Some(outpoint) } - /// Iterate over [`ScriptBuf`]s contained in this request. - pub fn iter_spks(&mut self) -> impl ExactSizeIterator + '_ { - SyncIter::::new(self) + /// Iterate over [`ScriptBuf`]s with corresponding [`Txid`] histories contained in this request. + pub fn iter_spks_with_expected_txids( + &mut self, + ) -> impl ExactSizeIterator + '_ { + SyncIter::::new(self) } /// Iterate over [`Txid`]s contained in this request. @@ -543,11 +592,11 @@ impl<'r, I, Item> SyncIter<'r, I, Item> { impl<'r, I, Item> ExactSizeIterator for SyncIter<'r, I, Item> where SyncIter<'r, I, Item>: Iterator {} -impl Iterator for SyncIter<'_, I, ScriptBuf> { - type Item = ScriptBuf; +impl Iterator for SyncIter<'_, I, SpkWithExpectedTxids> { + type Item = SpkWithExpectedTxids; fn next(&mut self) -> Option { - self.request.next_spk() + self.request.next_spk_with_expected_txids() } fn size_hint(&self) -> (usize, Option) { diff --git a/crates/core/src/tx_update.rs b/crates/core/src/tx_update.rs index 0b548313a..89a224fbd 100644 --- a/crates/core/src/tx_update.rs +++ b/crates/core/src/tx_update.rs @@ -44,6 +44,12 @@ pub struct TxUpdate { /// [`SyncRequest::start_time`](crate::spk_client::SyncRequest::start_time) can be used to /// provide the `seen_at` value. pub seen_ats: HashSet<(Txid, u64)>, + + /// When transactions were discovered to be missing (evicted) from the mempool. + /// + /// [`SyncRequest::start_time`](crate::spk_client::SyncRequest::start_time) can be used to + /// provide the `evicted_at` value. + pub evicted_ats: HashSet<(Txid, u64)>, } impl Default for TxUpdate { @@ -53,6 +59,7 @@ impl Default for TxUpdate { txouts: Default::default(), anchors: Default::default(), seen_ats: Default::default(), + evicted_ats: Default::default(), } } } @@ -72,6 +79,7 @@ impl TxUpdate { .map(|(a, txid)| (map(a), txid)) .collect(), seen_ats: self.seen_ats, + evicted_ats: self.evicted_ats, } } @@ -81,5 +89,6 @@ impl TxUpdate { self.txouts.extend(other.txouts); self.anchors.extend(other.anchors); self.seen_ats.extend(other.seen_ats); + self.evicted_ats.extend(other.evicted_ats); } } diff --git a/crates/electrum/src/bdk_electrum_client.rs b/crates/electrum/src/bdk_electrum_client.rs index 163854ad3..fb387bb39 100644 --- a/crates/electrum/src/bdk_electrum_client.rs +++ b/crates/electrum/src/bdk_electrum_client.rs @@ -1,14 +1,13 @@ use bdk_core::{ - bitcoin::{block::Header, BlockHash, OutPoint, ScriptBuf, Transaction, Txid}, - collections::{BTreeMap, HashMap}, - spk_client::{FullScanRequest, FullScanResponse, SyncRequest, SyncResponse}, + bitcoin::{block::Header, BlockHash, OutPoint, Transaction, Txid}, + collections::{BTreeMap, HashMap, HashSet}, + spk_client::{ + FullScanRequest, FullScanResponse, SpkWithExpectedTxids, SyncRequest, SyncResponse, + }, BlockId, CheckPoint, ConfirmationBlockTime, TxUpdate, }; use electrum_client::{ElectrumApi, Error, HeaderNotification}; -use std::{ - collections::HashSet, - sync::{Arc, Mutex}, -}; +use std::sync::{Arc, Mutex}; /// We include a chain suffix of a certain length for the purpose of robustness. const CHAIN_SUFFIX_LENGTH: u32 = 8; @@ -138,7 +137,9 @@ impl BdkElectrumClient { let mut tx_update = TxUpdate::::default(); let mut last_active_indices = BTreeMap::::default(); for keychain in request.keychains() { - let spks = request.iter_spks(keychain.clone()); + let spks = request + .iter_spks(keychain.clone()) + .map(|(spk_i, spk)| (spk_i, SpkWithExpectedTxids::from(spk))); if let Some(last_active_index) = self.populate_with_spks(start_time, &mut tx_update, spks, stop_gap, batch_size)? { @@ -209,7 +210,7 @@ impl BdkElectrumClient { start_time, &mut tx_update, request - .iter_spks() + .iter_spks_with_expected_txids() .enumerate() .map(|(i, spk)| (i as u32, spk)), usize::MAX, @@ -247,7 +248,7 @@ impl BdkElectrumClient { &self, start_time: u64, tx_update: &mut TxUpdate, - mut spks: impl Iterator, + mut spks_with_expected_txids: impl Iterator, stop_gap: usize, batch_size: usize, ) -> Result, Error> { @@ -256,7 +257,7 @@ impl BdkElectrumClient { loop { let spks = (0..batch_size) - .map_while(|_| spks.next()) + .map_while(|_| spks_with_expected_txids.next()) .collect::>(); if spks.is_empty() { return Ok(last_active_index); @@ -264,9 +265,9 @@ impl BdkElectrumClient { let spk_histories = self .inner - .batch_script_get_history(spks.iter().map(|(_, s)| s.as_script()))?; + .batch_script_get_history(spks.iter().map(|(_, s)| s.spk.as_script()))?; - for ((spk_index, _spk), spk_history) in spks.into_iter().zip(spk_histories) { + for ((spk_index, spk), spk_history) in spks.into_iter().zip(spk_histories) { if spk_history.is_empty() { unused_spk_count = unused_spk_count.saturating_add(1); if unused_spk_count >= stop_gap { @@ -277,6 +278,17 @@ impl BdkElectrumClient { unused_spk_count = 0; } + let spk_history_set = spk_history + .iter() + .map(|res| res.tx_hash) + .collect::>(); + + tx_update.evicted_ats.extend( + spk.expected_txids + .difference(&spk_history_set) + .map(|&txid| (txid, start_time)), + ); + for tx_res in spk_history { tx_update.txs.push(self.fetch_tx(tx_res.tx_hash)?); match tx_res.height.try_into() { diff --git a/crates/electrum/tests/test_electrum.rs b/crates/electrum/tests/test_electrum.rs index da15e9803..3c1d11803 100644 --- a/crates/electrum/tests/test_electrum.rs +++ b/crates/electrum/tests/test_electrum.rs @@ -5,7 +5,10 @@ use bdk_chain::{ spk_txout::SpkTxOutIndex, Balance, ConfirmationBlockTime, IndexedTxGraph, Indexer, Merge, TxGraph, }; -use bdk_core::bitcoin::Network; +use bdk_core::bitcoin::{ + key::{Secp256k1, UntweakedPublicKey}, + Network, +}; use bdk_electrum::BdkElectrumClient; use bdk_testenv::{ anyhow, @@ -14,12 +17,22 @@ use bdk_testenv::{ }; use core::time::Duration; use electrum_client::ElectrumApi; -use std::collections::{BTreeSet, HashSet}; +use std::collections::{BTreeSet, HashMap, HashSet}; use std::str::FromStr; // Batch size for `sync_with_electrum`. const BATCH_SIZE: usize = 5; +pub fn get_test_spk() -> ScriptBuf { + const PK_BYTES: &[u8] = &[ + 12, 244, 72, 4, 163, 4, 211, 81, 159, 82, 153, 123, 125, 74, 142, 40, 55, 237, 191, 231, + 31, 114, 89, 165, 83, 141, 8, 203, 93, 240, 53, 101, + ]; + let secp = Secp256k1::new(); + let pk = UntweakedPublicKey::from_slice(PK_BYTES).expect("Must be valid PK"); + ScriptBuf::new_p2tr(&secp, pk, None) +} + fn get_balance( recv_chain: &LocalChain, recv_graph: &IndexedTxGraph>, @@ -60,6 +73,122 @@ where Ok(update) } +// Ensure that a wallet can detect a malicious replacement of an incoming transaction. +// +// This checks that both the Electrum chain source and the receiving structures properly track the +// replaced transaction as missing. +#[test] +pub fn detect_receive_tx_cancel() -> anyhow::Result<()> { + const SEND_TX_FEE: Amount = Amount::from_sat(1000); + const UNDO_SEND_TX_FEE: Amount = Amount::from_sat(2000); + + let env = TestEnv::new()?; + let rpc_client = env.rpc_client(); + let electrum_client = electrum_client::Client::new(env.electrsd.electrum_url.as_str())?; + let client = BdkElectrumClient::new(electrum_client); + + let mut graph = IndexedTxGraph::::new(SpkTxOutIndex::<()>::default()); + let (chain, _) = LocalChain::from_genesis_hash(env.bitcoind.client.get_block_hash(0)?); + + // Get receiving address. + let receiver_spk = get_test_spk(); + let receiver_addr = Address::from_script(&receiver_spk, bdk_chain::bitcoin::Network::Regtest)?; + graph.index.insert_spk((), receiver_spk); + + env.mine_blocks(101, None)?; + + // Select a UTXO to use as an input for constructing our test transactions. + let selected_utxo = rpc_client + .list_unspent(None, None, None, Some(false), None)? + .into_iter() + // Find a block reward tx. + .find(|utxo| utxo.amount == Amount::from_int_btc(50)) + .expect("Must find a block reward UTXO"); + + // Derive the sender's address from the selected UTXO. + let sender_spk = selected_utxo.script_pub_key.clone(); + let sender_addr = Address::from_script(&sender_spk, bdk_chain::bitcoin::Network::Regtest) + .expect("Failed to derive address from UTXO"); + + // Setup the common inputs used by both `send_tx` and `undo_send_tx`. + let inputs = [CreateRawTransactionInput { + txid: selected_utxo.txid, + vout: selected_utxo.vout, + sequence: None, + }]; + + // Create and sign the `send_tx` that sends funds to the receiver address. + let send_tx_outputs = HashMap::from([( + receiver_addr.to_string(), + selected_utxo.amount - SEND_TX_FEE, + )]); + let send_tx = rpc_client.create_raw_transaction(&inputs, &send_tx_outputs, None, Some(true))?; + let send_tx = rpc_client + .sign_raw_transaction_with_wallet(send_tx.raw_hex(), None, None)? + .transaction()?; + + // Create and sign the `undo_send_tx` transaction. This redirects funds back to the sender + // address. + let undo_send_outputs = HashMap::from([( + sender_addr.to_string(), + selected_utxo.amount - UNDO_SEND_TX_FEE, + )]); + let undo_send_tx = + rpc_client.create_raw_transaction(&inputs, &undo_send_outputs, None, Some(true))?; + let undo_send_tx = rpc_client + .sign_raw_transaction_with_wallet(undo_send_tx.raw_hex(), None, None)? + .transaction()?; + + // Sync after broadcasting the `send_tx`. Ensure that we detect and receive the `send_tx`. + let send_txid = env.rpc_client().send_raw_transaction(send_tx.raw_hex())?; + env.wait_until_electrum_sees_txid(send_txid, Duration::from_secs(6))?; + let sync_request = SyncRequest::builder() + .chain_tip(chain.tip()) + .spks_with_indexes(graph.index.all_spks().clone()) + .expected_spk_txids(graph.list_expected_spk_txids(&chain, chain.tip().block_id(), ..)); + let sync_response = client.sync(sync_request, BATCH_SIZE, true)?; + assert!( + sync_response + .tx_update + .txs + .iter() + .any(|tx| tx.compute_txid() == send_txid), + "sync response must include the send_tx" + ); + let changeset = graph.apply_update(sync_response.tx_update.clone()); + assert!( + changeset.tx_graph.txs.contains(&send_tx), + "tx graph must deem send_tx relevant and include it" + ); + + // Sync after broadcasting the `undo_send_tx`. Verify that `send_tx` is now missing from the + // mempool. + let undo_send_txid = env + .rpc_client() + .send_raw_transaction(undo_send_tx.raw_hex())?; + env.wait_until_electrum_sees_txid(undo_send_txid, Duration::from_secs(6))?; + let sync_request = SyncRequest::builder() + .chain_tip(chain.tip()) + .spks_with_indexes(graph.index.all_spks().clone()) + .expected_spk_txids(graph.list_expected_spk_txids(&chain, chain.tip().block_id(), ..)); + let sync_response = client.sync(sync_request, BATCH_SIZE, true)?; + assert!( + sync_response + .tx_update + .evicted_ats + .iter() + .any(|(txid, _)| *txid == send_txid), + "sync response must track send_tx as missing from mempool" + ); + let changeset = graph.apply_update(sync_response.tx_update.clone()); + assert!( + changeset.tx_graph.last_evicted.contains_key(&send_txid), + "tx graph must track send_tx as missing" + ); + + Ok(()) +} + /// If an spk history contains a tx that spends another unconfirmed tx (chained mempool history), /// the Electrum API will return the tx with a negative height. This should succeed and not panic. #[test] diff --git a/crates/esplora/Cargo.toml b/crates/esplora/Cargo.toml index 004236cc3..85054f2e8 100644 --- a/crates/esplora/Cargo.toml +++ b/crates/esplora/Cargo.toml @@ -16,20 +16,19 @@ workspace = true [dependencies] bdk_core = { path = "../core", version = "0.4.1", default-features = false } -esplora-client = { version = "0.11.0", default-features = false } +esplora-client = { version = "0.11.0", default-features = false } async-trait = { version = "0.1.66", optional = true } futures = { version = "0.3.26", optional = true } -miniscript = { version = "12.0.0", optional = true, default-features = false } [dev-dependencies] -esplora-client = { version = "0.11.0" } +esplora-client = { version = "0.11.0" } bdk_chain = { path = "../chain" } bdk_testenv = { path = "../testenv" } tokio = { version = "1", features = ["rt", "rt-multi-thread", "macros"] } [features] default = ["std", "async-https", "blocking-https"] -std = ["bdk_chain/std", "miniscript?/std"] +std = ["bdk_core/std"] tokio = ["esplora-client/tokio"] async = ["async-trait", "futures", "esplora-client/async"] async-https = ["async", "esplora-client/async-https"] diff --git a/crates/esplora/src/async_ext.rs b/crates/esplora/src/async_ext.rs index 4cb34ad80..7d8460c56 100644 --- a/crates/esplora/src/async_ext.rs +++ b/crates/esplora/src/async_ext.rs @@ -1,8 +1,10 @@ use async_trait::async_trait; use bdk_core::collections::{BTreeMap, BTreeSet, HashSet}; -use bdk_core::spk_client::{FullScanRequest, FullScanResponse, SyncRequest, SyncResponse}; +use bdk_core::spk_client::{ + FullScanRequest, FullScanResponse, SpkWithExpectedTxids, SyncRequest, SyncResponse, +}; use bdk_core::{ - bitcoin::{BlockHash, OutPoint, ScriptBuf, Txid}, + bitcoin::{BlockHash, OutPoint, Txid}, BlockId, CheckPoint, ConfirmationBlockTime, Indexed, TxUpdate, }; use esplora_client::Sleeper; @@ -62,7 +64,7 @@ where stop_gap: usize, parallel_requests: usize, ) -> Result, Error> { - let mut request = request.into(); + let mut request: FullScanRequest = request.into(); let start_time = request.start_time(); let keychains = request.keychains(); @@ -77,7 +79,9 @@ where let mut inserted_txs = HashSet::::new(); let mut last_active_indices = BTreeMap::::new(); for keychain in keychains { - let keychain_spks = request.iter_spks(keychain.clone()); + let keychain_spks = request + .iter_spks(keychain.clone()) + .map(|(spk_i, spk)| (spk_i, spk.into())); let (update, last_active_index) = fetch_txs_with_keychain_spks( self, start_time, @@ -112,7 +116,7 @@ where request: R, parallel_requests: usize, ) -> Result { - let mut request = request.into(); + let mut request: SyncRequest = request.into(); let start_time = request.start_time(); let chain_tip = request.chain_tip(); @@ -129,7 +133,7 @@ where self, start_time, &mut inserted_txs, - request.iter_spks(), + request.iter_spks_with_expected_txids(), parallel_requests, ) .await?, @@ -291,10 +295,10 @@ async fn fetch_txs_with_keychain_spks( parallel_requests: usize, ) -> Result<(TxUpdate, Option), Error> where - I: Iterator> + Send, + I: Iterator> + Send, S: Sleeper + Clone + Send + Sync, { - type TxsOfSpkIndex = (u32, Vec); + type TxsOfSpkIndex = (u32, Vec, HashSet); let mut update = TxUpdate::::default(); let mut last_index = Option::::None; @@ -306,6 +310,8 @@ where .take(parallel_requests) .map(|(spk_index, spk)| { let client = client.clone(); + let expected_txids = spk.expected_txids; + let spk = spk.spk; async move { let mut last_seen = None; let mut spk_txs = Vec::new(); @@ -315,9 +321,15 @@ where last_seen = txs.last().map(|tx| tx.txid); spk_txs.extend(txs); if tx_count < 25 { - break Result::<_, Error>::Ok((spk_index, spk_txs)); + break; } } + let got_txids = spk_txs.iter().map(|tx| tx.txid).collect::>(); + let evicted_txids = expected_txids + .difference(&got_txids) + .copied() + .collect::>(); + Result::::Ok((spk_index, spk_txs, evicted_txids)) } }) .collect::>(); @@ -326,7 +338,7 @@ where break; } - for (index, txs) in handles.try_collect::>().await? { + for (index, txs, evicted) in handles.try_collect::>().await? { last_index = Some(index); if !txs.is_empty() { last_active_index = Some(index); @@ -338,6 +350,9 @@ where insert_anchor_or_seen_at_from_status(&mut update, start_time, tx.txid, tx.status); insert_prevouts(&mut update, tx.vin); } + update + .evicted_ats + .extend(evicted.into_iter().map(|txid| (txid, start_time))); } let last_index = last_index.expect("Must be set since handles wasn't empty."); @@ -370,7 +385,7 @@ async fn fetch_txs_with_spks( parallel_requests: usize, ) -> Result, Error> where - I: IntoIterator + Send, + I: IntoIterator + Send, I::IntoIter: Send, S: Sleeper + Clone + Send + Sync, { diff --git a/crates/esplora/src/blocking_ext.rs b/crates/esplora/src/blocking_ext.rs index 36c97195a..bee97feed 100644 --- a/crates/esplora/src/blocking_ext.rs +++ b/crates/esplora/src/blocking_ext.rs @@ -1,7 +1,9 @@ use bdk_core::collections::{BTreeMap, BTreeSet, HashSet}; -use bdk_core::spk_client::{FullScanRequest, FullScanResponse, SyncRequest, SyncResponse}; +use bdk_core::spk_client::{ + FullScanRequest, FullScanResponse, SpkWithExpectedTxids, SyncRequest, SyncResponse, +}; use bdk_core::{ - bitcoin::{BlockHash, OutPoint, ScriptBuf, Txid}, + bitcoin::{BlockHash, OutPoint, Txid}, BlockId, CheckPoint, ConfirmationBlockTime, Indexed, TxUpdate, }; use esplora_client::{OutputStatus, Tx}; @@ -53,7 +55,7 @@ impl EsploraExt for esplora_client::BlockingClient { stop_gap: usize, parallel_requests: usize, ) -> Result, Error> { - let mut request = request.into(); + let mut request: FullScanRequest = request.into(); let start_time = request.start_time(); let chain_tip = request.chain_tip(); @@ -67,7 +69,9 @@ impl EsploraExt for esplora_client::BlockingClient { let mut inserted_txs = HashSet::::new(); let mut last_active_indices = BTreeMap::::new(); for keychain in request.keychains() { - let keychain_spks = request.iter_spks(keychain.clone()); + let keychain_spks = request + .iter_spks(keychain.clone()) + .map(|(spk_i, spk)| (spk_i, spk.into())); let (update, last_active_index) = fetch_txs_with_keychain_spks( self, start_time, @@ -120,7 +124,7 @@ impl EsploraExt for esplora_client::BlockingClient { self, start_time, &mut inserted_txs, - request.iter_spks(), + request.iter_spks_with_expected_txids(), parallel_requests, )?); tx_update.extend(fetch_txs_with_txids( @@ -254,7 +258,7 @@ fn chain_update( Ok(tip) } -fn fetch_txs_with_keychain_spks>>( +fn fetch_txs_with_keychain_spks>>( client: &esplora_client::BlockingClient, start_time: u64, inserted_txs: &mut HashSet, @@ -262,7 +266,7 @@ fn fetch_txs_with_keychain_spks>>( stop_gap: usize, parallel_requests: usize, ) -> Result<(TxUpdate, Option), Error> { - type TxsOfSpkIndex = (u32, Vec); + type TxsOfSpkIndex = (u32, Vec, HashSet); let mut update = TxUpdate::::default(); let mut last_index = Option::::None; @@ -273,21 +277,27 @@ fn fetch_txs_with_keychain_spks>>( .by_ref() .take(parallel_requests) .map(|(spk_index, spk)| { - std::thread::spawn({ - let client = client.clone(); - move || -> Result { - let mut last_seen = None; - let mut spk_txs = Vec::new(); - loop { - let txs = client.scripthash_txs(&spk, last_seen)?; - let tx_count = txs.len(); - last_seen = txs.last().map(|tx| tx.txid); - spk_txs.extend(txs); - if tx_count < 25 { - break Ok((spk_index, spk_txs)); - } + let client = client.clone(); + let expected_txids = spk.expected_txids; + let spk = spk.spk; + std::thread::spawn(move || -> Result { + let mut last_txid = None; + let mut spk_txs = Vec::new(); + loop { + let txs = client.scripthash_txs(&spk, last_txid)?; + let tx_count = txs.len(); + last_txid = txs.last().map(|tx| tx.txid); + spk_txs.extend(txs); + if tx_count < 25 { + break; } } + let got_txids = spk_txs.iter().map(|tx| tx.txid).collect::>(); + let evicted_txids = expected_txids + .difference(&got_txids) + .copied() + .collect::>(); + Ok((spk_index, spk_txs, evicted_txids)) }) }) .collect::>>>(); @@ -297,7 +307,7 @@ fn fetch_txs_with_keychain_spks>>( } for handle in handles { - let (index, txs) = handle.join().expect("thread must not panic")?; + let (index, txs, evicted) = handle.join().expect("thread must not panic")?; last_index = Some(index); if !txs.is_empty() { last_active_index = Some(index); @@ -309,6 +319,9 @@ fn fetch_txs_with_keychain_spks>>( insert_anchor_or_seen_at_from_status(&mut update, start_time, tx.txid, tx.status); insert_prevouts(&mut update, tx.vin); } + update + .evicted_ats + .extend(evicted.into_iter().map(|txid| (txid, start_time))); } let last_index = last_index.expect("Must be set since handles wasn't empty."); @@ -333,7 +346,7 @@ fn fetch_txs_with_keychain_spks>>( /// requests to make in parallel. /// /// Refer to [crate-level docs](crate) for more. -fn fetch_txs_with_spks>( +fn fetch_txs_with_spks>( client: &esplora_client::BlockingClient, start_time: u64, inserted_txs: &mut HashSet, diff --git a/crates/esplora/tests/async_ext.rs b/crates/esplora/tests/async_ext.rs index b535d2bfa..987f04e41 100644 --- a/crates/esplora/tests/async_ext.rs +++ b/crates/esplora/tests/async_ext.rs @@ -1,15 +1,135 @@ +use bdk_chain::bitcoin::{Address, Amount}; +use bdk_chain::local_chain::LocalChain; use bdk_chain::spk_client::{FullScanRequest, SyncRequest}; -use bdk_chain::{ConfirmationBlockTime, TxGraph}; +use bdk_chain::spk_txout::SpkTxOutIndex; +use bdk_chain::{ConfirmationBlockTime, IndexedTxGraph, TxGraph}; use bdk_esplora::EsploraAsyncExt; +use bdk_testenv::bitcoincore_rpc::json::CreateRawTransactionInput; +use bdk_testenv::bitcoincore_rpc::RawTx; +use bdk_testenv::{anyhow, bitcoincore_rpc::RpcApi, TestEnv}; use esplora_client::{self, Builder}; -use std::collections::{BTreeSet, HashSet}; +use std::collections::{BTreeSet, HashMap, HashSet}; use std::str::FromStr; use std::thread::sleep; use std::time::Duration; -use bdk_chain::bitcoin::{Address, Amount}; -use bdk_testenv::{anyhow, bitcoincore_rpc::RpcApi, TestEnv}; +mod common; + +// Ensure that a wallet can detect a malicious replacement of an incoming transaction. +// +// This checks that both the Esplora chain source and the receiving structures properly track the +// replaced transaction as missing. +#[tokio::test] +pub async fn detect_receive_tx_cancel() -> anyhow::Result<()> { + const SEND_TX_FEE: Amount = Amount::from_sat(1000); + const UNDO_SEND_TX_FEE: Amount = Amount::from_sat(2000); + + let env = TestEnv::new()?; + let rpc_client = env.rpc_client(); + let base_url = format!("http://{}", &env.electrsd.esplora_url.clone().unwrap()); + let client = Builder::new(base_url.as_str()).build_async()?; + + let mut graph = IndexedTxGraph::::new(SpkTxOutIndex::<()>::default()); + let (chain, _) = LocalChain::from_genesis_hash(env.bitcoind.client.get_block_hash(0)?); + + // Get receiving address. + let receiver_spk = common::get_test_spk(); + let receiver_addr = Address::from_script(&receiver_spk, bdk_chain::bitcoin::Network::Regtest)?; + graph.index.insert_spk((), receiver_spk); + + env.mine_blocks(101, None)?; + + // Select a UTXO to use as an input for constructing our test transactions. + let selected_utxo = rpc_client + .list_unspent(None, None, None, Some(false), None)? + .into_iter() + // Find a block reward tx. + .find(|utxo| utxo.amount == Amount::from_int_btc(50)) + .expect("Must find a block reward UTXO"); + + // Derive the sender's address from the selected UTXO. + let sender_spk = selected_utxo.script_pub_key.clone(); + let sender_addr = Address::from_script(&sender_spk, bdk_chain::bitcoin::Network::Regtest) + .expect("Failed to derive address from UTXO"); + + // Setup the common inputs used by both `send_tx` and `undo_send_tx`. + let inputs = [CreateRawTransactionInput { + txid: selected_utxo.txid, + vout: selected_utxo.vout, + sequence: None, + }]; + + // Create and sign the `send_tx` that sends funds to the receiver address. + let send_tx_outputs = HashMap::from([( + receiver_addr.to_string(), + selected_utxo.amount - SEND_TX_FEE, + )]); + let send_tx = rpc_client.create_raw_transaction(&inputs, &send_tx_outputs, None, Some(true))?; + let send_tx = rpc_client + .sign_raw_transaction_with_wallet(send_tx.raw_hex(), None, None)? + .transaction()?; + // Create and sign the `undo_send_tx` transaction. This redirects funds back to the sender + // address. + let undo_send_outputs = HashMap::from([( + sender_addr.to_string(), + selected_utxo.amount - UNDO_SEND_TX_FEE, + )]); + let undo_send_tx = + rpc_client.create_raw_transaction(&inputs, &undo_send_outputs, None, Some(true))?; + let undo_send_tx = rpc_client + .sign_raw_transaction_with_wallet(undo_send_tx.raw_hex(), None, None)? + .transaction()?; + + // Sync after broadcasting the `send_tx`. Ensure that we detect and receive the `send_tx`. + let send_txid = env.rpc_client().send_raw_transaction(send_tx.raw_hex())?; + env.wait_until_electrum_sees_txid(send_txid, Duration::from_secs(6))?; + let sync_request = SyncRequest::builder() + .chain_tip(chain.tip()) + .spks_with_indexes(graph.index.all_spks().clone()) + .expected_spk_txids(graph.list_expected_spk_txids(&chain, chain.tip().block_id(), ..)); + let sync_response = client.sync(sync_request, 1).await?; + assert!( + sync_response + .tx_update + .txs + .iter() + .any(|tx| tx.compute_txid() == send_txid), + "sync response must include the send_tx" + ); + let changeset = graph.apply_update(sync_response.tx_update.clone()); + assert!( + changeset.tx_graph.txs.contains(&send_tx), + "tx graph must deem send_tx relevant and include it" + ); + + // Sync after broadcasting the `undo_send_tx`. Verify that `send_tx` is now missing from the + // mempool. + let undo_send_txid = env + .rpc_client() + .send_raw_transaction(undo_send_tx.raw_hex())?; + env.wait_until_electrum_sees_txid(undo_send_txid, Duration::from_secs(6))?; + let sync_request = SyncRequest::builder() + .chain_tip(chain.tip()) + .spks_with_indexes(graph.index.all_spks().clone()) + .expected_spk_txids(graph.list_expected_spk_txids(&chain, chain.tip().block_id(), ..)); + let sync_response = client.sync(sync_request, 1).await?; + assert!( + sync_response + .tx_update + .evicted_ats + .iter() + .any(|(txid, _)| *txid == send_txid), + "sync response must track send_tx as missing from mempool" + ); + let changeset = graph.apply_update(sync_response.tx_update.clone()); + assert!( + changeset.tx_graph.last_evicted.contains_key(&send_txid), + "tx graph must track send_tx as missing" + ); + + Ok(()) +} #[tokio::test] pub async fn test_update_tx_graph_without_keychain() -> anyhow::Result<()> { let env = TestEnv::new()?; diff --git a/crates/esplora/tests/blocking_ext.rs b/crates/esplora/tests/blocking_ext.rs index d4191ceb0..d6f8c448d 100644 --- a/crates/esplora/tests/blocking_ext.rs +++ b/crates/esplora/tests/blocking_ext.rs @@ -1,14 +1,135 @@ +use bdk_chain::bitcoin::{Address, Amount}; +use bdk_chain::local_chain::LocalChain; use bdk_chain::spk_client::{FullScanRequest, SyncRequest}; -use bdk_chain::{ConfirmationBlockTime, TxGraph}; +use bdk_chain::spk_txout::SpkTxOutIndex; +use bdk_chain::{ConfirmationBlockTime, IndexedTxGraph, TxGraph}; use bdk_esplora::EsploraExt; +use bdk_testenv::bitcoincore_rpc::json::CreateRawTransactionInput; +use bdk_testenv::bitcoincore_rpc::RawTx; +use bdk_testenv::{anyhow, bitcoincore_rpc::RpcApi, TestEnv}; use esplora_client::{self, Builder}; -use std::collections::{BTreeSet, HashSet}; +use std::collections::{BTreeSet, HashMap, HashSet}; use std::str::FromStr; use std::thread::sleep; use std::time::Duration; -use bdk_chain::bitcoin::{Address, Amount}; -use bdk_testenv::{anyhow, bitcoincore_rpc::RpcApi, TestEnv}; +mod common; + +// Ensure that a wallet can detect a malicious replacement of an incoming transaction. +// +// This checks that both the Esplora chain source and the receiving structures properly track the +// replaced transaction as missing. +#[test] +pub fn detect_receive_tx_cancel() -> anyhow::Result<()> { + const SEND_TX_FEE: Amount = Amount::from_sat(1000); + const UNDO_SEND_TX_FEE: Amount = Amount::from_sat(2000); + + let env = TestEnv::new()?; + let rpc_client = env.rpc_client(); + let base_url = format!("http://{}", &env.electrsd.esplora_url.clone().unwrap()); + let client = Builder::new(base_url.as_str()).build_blocking(); + + let mut graph = IndexedTxGraph::::new(SpkTxOutIndex::<()>::default()); + let (chain, _) = LocalChain::from_genesis_hash(env.bitcoind.client.get_block_hash(0)?); + + // Get receiving address. + let receiver_spk = common::get_test_spk(); + let receiver_addr = Address::from_script(&receiver_spk, bdk_chain::bitcoin::Network::Regtest)?; + graph.index.insert_spk((), receiver_spk); + + env.mine_blocks(101, None)?; + + // Select a UTXO to use as an input for constructing our test transactions. + let selected_utxo = rpc_client + .list_unspent(None, None, None, Some(false), None)? + .into_iter() + // Find a block reward tx. + .find(|utxo| utxo.amount == Amount::from_int_btc(50)) + .expect("Must find a block reward UTXO"); + + // Derive the sender's address from the selected UTXO. + let sender_spk = selected_utxo.script_pub_key.clone(); + let sender_addr = Address::from_script(&sender_spk, bdk_chain::bitcoin::Network::Regtest) + .expect("Failed to derive address from UTXO"); + + // Setup the common inputs used by both `send_tx` and `undo_send_tx`. + let inputs = [CreateRawTransactionInput { + txid: selected_utxo.txid, + vout: selected_utxo.vout, + sequence: None, + }]; + + // Create and sign the `send_tx` that sends funds to the receiver address. + let send_tx_outputs = HashMap::from([( + receiver_addr.to_string(), + selected_utxo.amount - SEND_TX_FEE, + )]); + let send_tx = rpc_client.create_raw_transaction(&inputs, &send_tx_outputs, None, Some(true))?; + let send_tx = rpc_client + .sign_raw_transaction_with_wallet(send_tx.raw_hex(), None, None)? + .transaction()?; + + // Create and sign the `undo_send_tx` transaction. This redirects funds back to the sender + // address. + let undo_send_outputs = HashMap::from([( + sender_addr.to_string(), + selected_utxo.amount - UNDO_SEND_TX_FEE, + )]); + let undo_send_tx = + rpc_client.create_raw_transaction(&inputs, &undo_send_outputs, None, Some(true))?; + let undo_send_tx = rpc_client + .sign_raw_transaction_with_wallet(undo_send_tx.raw_hex(), None, None)? + .transaction()?; + + // Sync after broadcasting the `send_tx`. Ensure that we detect and receive the `send_tx`. + let send_txid = env.rpc_client().send_raw_transaction(send_tx.raw_hex())?; + env.wait_until_electrum_sees_txid(send_txid, Duration::from_secs(6))?; + let sync_request = SyncRequest::builder() + .chain_tip(chain.tip()) + .spks_with_indexes(graph.index.all_spks().clone()) + .expected_spk_txids(graph.list_expected_spk_txids(&chain, chain.tip().block_id(), ..)); + let sync_response = client.sync(sync_request, 1)?; + assert!( + sync_response + .tx_update + .txs + .iter() + .any(|tx| tx.compute_txid() == send_txid), + "sync response must include the send_tx" + ); + let changeset = graph.apply_update(sync_response.tx_update.clone()); + assert!( + changeset.tx_graph.txs.contains(&send_tx), + "tx graph must deem send_tx relevant and include it" + ); + + // Sync after broadcasting the `undo_send_tx`. Verify that `send_tx` is now missing from the + // mempool. + let undo_send_txid = env + .rpc_client() + .send_raw_transaction(undo_send_tx.raw_hex())?; + env.wait_until_electrum_sees_txid(undo_send_txid, Duration::from_secs(6))?; + let sync_request = SyncRequest::builder() + .chain_tip(chain.tip()) + .spks_with_indexes(graph.index.all_spks().clone()) + .expected_spk_txids(graph.list_expected_spk_txids(&chain, chain.tip().block_id(), ..)); + let sync_response = client.sync(sync_request, 1)?; + assert!( + sync_response + .tx_update + .evicted_ats + .iter() + .any(|(txid, _)| *txid == send_txid), + "sync response must track send_tx as missing from mempool" + ); + let changeset = graph.apply_update(sync_response.tx_update.clone()); + assert!( + changeset.tx_graph.last_evicted.contains_key(&send_txid), + "tx graph must track send_tx as missing" + ); + + Ok(()) +} #[test] pub fn test_update_tx_graph_without_keychain() -> anyhow::Result<()> { diff --git a/crates/esplora/tests/common/mod.rs b/crates/esplora/tests/common/mod.rs new file mode 100644 index 000000000..c629c5029 --- /dev/null +++ b/crates/esplora/tests/common/mod.rs @@ -0,0 +1,14 @@ +use bdk_core::bitcoin::key::{Secp256k1, UntweakedPublicKey}; +use bdk_core::bitcoin::ScriptBuf; + +const PK_BYTES: &[u8] = &[ + 12, 244, 72, 4, 163, 4, 211, 81, 159, 82, 153, 123, 125, 74, 142, 40, 55, 237, 191, 231, 31, + 114, 89, 165, 83, 141, 8, 203, 93, 240, 53, 101, +]; + +#[allow(dead_code)] +pub fn get_test_spk() -> ScriptBuf { + let secp = Secp256k1::new(); + let pk = UntweakedPublicKey::from_slice(PK_BYTES).expect("Must be valid PK"); + ScriptBuf::new_p2tr(&secp, pk, None) +} diff --git a/example-crates/example_electrum/src/main.rs b/example-crates/example_electrum/src/main.rs index 8e3110d68..b6c93a1d0 100644 --- a/example-crates/example_electrum/src/main.rs +++ b/example-crates/example_electrum/src/main.rs @@ -215,6 +215,11 @@ fn main() -> anyhow::Result<()> { eprintln!("[ SCANNING {:03.0}% ] {}", pc, item); }); + request = request.expected_spk_txids(graph.list_expected_spk_txids( + &*chain, + chain_tip.block_id(), + .., + )); if all_spks { request = request.spks_with_indexes(graph.index.revealed_spks(..)); } diff --git a/example-crates/example_esplora/src/main.rs b/example-crates/example_esplora/src/main.rs index 2c00751c2..b6a58e4cf 100644 --- a/example-crates/example_esplora/src/main.rs +++ b/example-crates/example_esplora/src/main.rs @@ -225,7 +225,11 @@ fn main() -> anyhow::Result<()> { { let graph = graph.lock().unwrap(); let chain = chain.lock().unwrap(); - + request = request.expected_spk_txids(graph.list_expected_spk_txids( + &*chain, + local_tip.block_id(), + .., + )); if *all_spks { request = request.spks_with_indexes(graph.index.revealed_spks(..)); }