Skip to content

Commit 800f358

Browse files
committed
feat!: Improve spk-based syncing flow
* Change `TxUpdate::seen_ats` to be a `HashSet<(Txid, u64)>` so we can introduce multiple timestamps per tx. This is useful to introduce both `first_seen` and `last_seen` timestamps to `TxGraph`. This is also a better API for chain-sources as they can just insert timestamps into the field without checking previous values. * Change sync/full-scan flow to have the request structure introduce the sync time instead of introducing the timestamp when apply the `TxUpdate`. This simplifies the `apply_update{_at}` logic and makes `evicted_at` easier to reason about (in the future).
1 parent ee52745 commit 800f358

File tree

11 files changed

+210
-140
lines changed

11 files changed

+210
-140
lines changed

crates/chain/benches/canonicalization.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,8 +133,9 @@ pub fn many_conflicting_unconfirmed(c: &mut Criterion) {
133133
..new_tx(i)
134134
};
135135
let mut update = TxUpdate::default();
136+
update.seen_ats = [(tx.compute_txid(), i as u64)].into();
136137
update.txs = vec![Arc::new(tx)];
137-
let _ = tx_graph.apply_update_at(update, Some(i as u64));
138+
let _ = tx_graph.apply_update(update);
138139
}
139140
}));
140141
c.bench_function("many_conflicting_unconfirmed::list_canonical_txs", {
@@ -168,8 +169,9 @@ pub fn many_chained_unconfirmed(c: &mut Criterion) {
168169
};
169170
let txid = tx.compute_txid();
170171
let mut update = TxUpdate::default();
172+
update.seen_ats = [(txid, i as u64)].into();
171173
update.txs = vec![Arc::new(tx)];
172-
let _ = tx_graph.apply_update_at(update, Some(i as u64));
174+
let _ = tx_graph.apply_update(update);
173175
// Store the next prevout.
174176
previous_output = OutPoint::new(txid, 0);
175177
}

crates/chain/src/indexed_tx_graph.rs

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -91,37 +91,12 @@ where
9191
/// Apply an `update` directly.
9292
///
9393
/// `update` is a [`tx_graph::TxUpdate<A>`] and the resultant changes is returned as [`ChangeSet`].
94-
#[cfg(feature = "std")]
95-
#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
9694
pub fn apply_update(&mut self, update: tx_graph::TxUpdate<A>) -> ChangeSet<A, I::ChangeSet> {
9795
let tx_graph = self.graph.apply_update(update);
9896
let indexer = self.index_tx_graph_changeset(&tx_graph);
9997
ChangeSet { tx_graph, indexer }
10098
}
10199

102-
/// Apply the given `update` with an optional `seen_at` timestamp.
103-
///
104-
/// `seen_at` represents when the update is seen (in unix seconds). It is used to determine the
105-
/// `last_seen`s for all transactions in the update which have no corresponding anchor(s). The
106-
/// `last_seen` value is used internally to determine precedence of conflicting unconfirmed
107-
/// transactions (where the transaction with the lower `last_seen` value is omitted from the
108-
/// canonical history).
109-
///
110-
/// Not setting a `seen_at` value means unconfirmed transactions introduced by this update will
111-
/// not be part of the canonical history of transactions.
112-
///
113-
/// Use [`apply_update`](IndexedTxGraph::apply_update) to have the `seen_at` value automatically
114-
/// set to the current time.
115-
pub fn apply_update_at(
116-
&mut self,
117-
update: tx_graph::TxUpdate<A>,
118-
seen_at: Option<u64>,
119-
) -> ChangeSet<A, I::ChangeSet> {
120-
let tx_graph = self.graph.apply_update_at(update, seen_at);
121-
let indexer = self.index_tx_graph_changeset(&tx_graph);
122-
ChangeSet { tx_graph, indexer }
123-
}
124-
125100
/// Insert a floating `txout` of given `outpoint`.
126101
pub fn insert_txout(&mut self, outpoint: OutPoint, txout: TxOut) -> ChangeSet<A, I::ChangeSet> {
127102
let graph = self.graph.insert_txout(outpoint, txout);

crates/chain/src/tx_graph.rs

Lines changed: 2 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ impl<A: Ord> From<TxGraph<A>> for TxUpdate<A> {
129129
impl<A: Anchor> From<TxUpdate<A>> for TxGraph<A> {
130130
fn from(update: TxUpdate<A>) -> Self {
131131
let mut graph = TxGraph::<A>::default();
132-
let _ = graph.apply_update_at(update, None);
132+
let _ = graph.apply_update(update);
133133
graph
134134
}
135135
}
@@ -719,52 +719,20 @@ impl<A: Anchor> TxGraph<A> {
719719
///
720720
/// The returned [`ChangeSet`] is the set difference between `update` and `self` (transactions that
721721
/// exist in `update` but not in `self`).
722-
#[cfg(feature = "std")]
723-
#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
724722
pub fn apply_update(&mut self, update: TxUpdate<A>) -> ChangeSet<A> {
725-
use std::time::*;
726-
let now = SystemTime::now()
727-
.duration_since(UNIX_EPOCH)
728-
.expect("current time must be greater than epoch anchor");
729-
self.apply_update_at(update, Some(now.as_secs()))
730-
}
731-
732-
/// Extends this graph with the given `update` alongside an optional `seen_at` timestamp.
733-
///
734-
/// `seen_at` represents when the update is seen (in unix seconds). It is used to determine the
735-
/// `last_seen`s for all transactions in the update which have no corresponding anchor(s). The
736-
/// `last_seen` value is used internally to determine precedence of conflicting unconfirmed
737-
/// transactions (where the transaction with the lower `last_seen` value is omitted from the
738-
/// canonical history).
739-
///
740-
/// Not setting a `seen_at` value means unconfirmed transactions introduced by this update will
741-
/// not be part of the canonical history of transactions.
742-
///
743-
/// Use [`apply_update`](TxGraph::apply_update) to have the `seen_at` value automatically set
744-
/// to the current time.
745-
pub fn apply_update_at(&mut self, update: TxUpdate<A>, seen_at: Option<u64>) -> ChangeSet<A> {
746723
let mut changeset = ChangeSet::<A>::default();
747-
let mut unanchored_txs = HashSet::<Txid>::new();
748724
for tx in update.txs {
749-
if unanchored_txs.insert(tx.compute_txid()) {
750-
changeset.merge(self.insert_tx(tx));
751-
}
725+
changeset.merge(self.insert_tx(tx));
752726
}
753727
for (outpoint, txout) in update.txouts {
754728
changeset.merge(self.insert_txout(outpoint, txout));
755729
}
756730
for (anchor, txid) in update.anchors {
757-
unanchored_txs.remove(&txid);
758731
changeset.merge(self.insert_anchor(txid, anchor));
759732
}
760733
for (txid, seen_at) in update.seen_ats {
761734
changeset.merge(self.insert_seen_at(txid, seen_at));
762735
}
763-
if let Some(seen_at) = seen_at {
764-
for txid in unanchored_txs {
765-
changeset.merge(self.insert_seen_at(txid, seen_at));
766-
}
767-
}
768736
changeset
769737
}
770738

crates/chain/tests/test_tx_graph.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ fn insert_txouts() {
9494
// Insert partials transactions.
9595
update.txouts.insert(*outpoint, txout.clone());
9696
// Mark them unconfirmed.
97-
update.seen_ats.insert(outpoint.txid, unconf_seen_at);
97+
update.seen_ats.insert((outpoint.txid, unconf_seen_at));
9898
}
9999

100100
// Insert the full transaction.
@@ -1289,7 +1289,7 @@ fn tx_graph_update_conversion() {
12891289

12901290
for (test_name, update) in test_cases {
12911291
let mut tx_graph = TxGraph::<ConfirmationBlockTime>::default();
1292-
let _ = tx_graph.apply_update_at(update.clone(), None);
1292+
let _ = tx_graph.apply_update(update.clone());
12931293
let update_from_tx_graph: TxUpdate<ConfirmationBlockTime> = tx_graph.into();
12941294

12951295
assert_eq!(

crates/core/src/spk_client.rs

Lines changed: 78 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -87,19 +87,13 @@ impl SyncProgress {
8787
}
8888

8989
/// Builds a [`SyncRequest`].
90+
///
91+
/// Construct with [`SyncRequest::builder`].
9092
#[must_use]
9193
pub struct SyncRequestBuilder<I = ()> {
9294
inner: SyncRequest<I>,
9395
}
9496

95-
impl<I> Default for SyncRequestBuilder<I> {
96-
fn default() -> Self {
97-
Self {
98-
inner: Default::default(),
99-
}
100-
}
101-
}
102-
10397
impl SyncRequestBuilder<()> {
10498
/// Add [`Script`]s that will be synced against.
10599
pub fn spks(self, spks: impl IntoIterator<Item = ScriptBuf>) -> Self {
@@ -210,6 +204,7 @@ impl<I> SyncRequestBuilder<I> {
210204
/// ```
211205
#[must_use]
212206
pub struct SyncRequest<I = ()> {
207+
start_time: u64,
213208
chain_tip: Option<CheckPoint>,
214209
spks: VecDeque<(I, ScriptBuf)>,
215210
spks_consumed: usize,
@@ -220,35 +215,56 @@ pub struct SyncRequest<I = ()> {
220215
inspect: Box<InspectSync<I>>,
221216
}
222217

223-
impl<I> Default for SyncRequest<I> {
224-
fn default() -> Self {
225-
Self {
226-
chain_tip: None,
227-
spks: VecDeque::new(),
228-
spks_consumed: 0,
229-
txids: VecDeque::new(),
230-
txids_consumed: 0,
231-
outpoints: VecDeque::new(),
232-
outpoints_consumed: 0,
233-
inspect: Box::new(|_, _| {}),
234-
}
235-
}
236-
}
237-
238218
impl<I> From<SyncRequestBuilder<I>> for SyncRequest<I> {
239219
fn from(builder: SyncRequestBuilder<I>) -> Self {
240220
builder.inner
241221
}
242222
}
243223

244224
impl<I> SyncRequest<I> {
245-
/// Start building a [`SyncRequest`].
246-
pub fn builder() -> SyncRequestBuilder<I> {
225+
/// Start building [`SyncRequest`] with a given `start_time`.
226+
///
227+
/// `start_time` specifies the start time of sync. Chain sources can use this value to set
228+
/// [`TxUpdate::seen_ats`](crate::TxUpdate::seen_ats) for mempool transactions. A transaction
229+
/// without any `seen_ats` is assumed to be unseen in the mempool.
230+
///
231+
/// Use [`SyncRequest::builder`] to use the current timestamp as `start_time` (this requires
232+
/// `feature = "std"`).
233+
pub fn builder_at(start_time: u64) -> SyncRequestBuilder<I> {
247234
SyncRequestBuilder {
248-
inner: Default::default(),
235+
inner: Self {
236+
start_time,
237+
chain_tip: None,
238+
spks: VecDeque::new(),
239+
spks_consumed: 0,
240+
txids: VecDeque::new(),
241+
txids_consumed: 0,
242+
outpoints: VecDeque::new(),
243+
outpoints_consumed: 0,
244+
inspect: Box::new(|_, _| ()),
245+
},
249246
}
250247
}
251248

249+
/// Start building [`SyncRequest`] with the current timestamp as the `start_time`.
250+
///
251+
/// Use [`SyncRequest::builder_at`] to manually set the `start_time`, or if `feature = "std"`
252+
/// is not available.
253+
#[cfg(feature = "std")]
254+
#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
255+
pub fn builder() -> SyncRequestBuilder<I> {
256+
let start_time = std::time::UNIX_EPOCH
257+
.elapsed()
258+
.expect("failed to get current timestamp")
259+
.as_secs();
260+
Self::builder_at(start_time)
261+
}
262+
263+
/// When the sync-request was initiated.
264+
pub fn start_time(&self) -> u64 {
265+
self.start_time
266+
}
267+
252268
/// Get the [`SyncProgress`] of this request.
253269
pub fn progress(&self) -> SyncProgress {
254270
SyncProgress {
@@ -339,19 +355,13 @@ impl<A> Default for SyncResponse<A> {
339355
}
340356

341357
/// Builds a [`FullScanRequest`].
358+
///
359+
/// Construct with [`FullScanRequest::builder`].
342360
#[must_use]
343361
pub struct FullScanRequestBuilder<K> {
344362
inner: FullScanRequest<K>,
345363
}
346364

347-
impl<K> Default for FullScanRequestBuilder<K> {
348-
fn default() -> Self {
349-
Self {
350-
inner: Default::default(),
351-
}
352-
}
353-
}
354-
355365
impl<K: Ord> FullScanRequestBuilder<K> {
356366
/// Set the initial chain tip for the full scan request.
357367
///
@@ -397,6 +407,7 @@ impl<K: Ord> FullScanRequestBuilder<K> {
397407
/// [`chain_tip`](FullScanRequestBuilder::chain_tip) (if provided).
398408
#[must_use]
399409
pub struct FullScanRequest<K> {
410+
start_time: u64,
400411
chain_tip: Option<CheckPoint>,
401412
spks_by_keychain: BTreeMap<K, Box<dyn Iterator<Item = Indexed<ScriptBuf>> + Send>>,
402413
inspect: Box<InspectFullScan<K>>,
@@ -408,22 +419,43 @@ impl<K> From<FullScanRequestBuilder<K>> for FullScanRequest<K> {
408419
}
409420
}
410421

411-
impl<K> Default for FullScanRequest<K> {
412-
fn default() -> Self {
413-
Self {
414-
chain_tip: None,
415-
spks_by_keychain: Default::default(),
416-
inspect: Box::new(|_, _, _| {}),
422+
impl<K: Ord + Clone> FullScanRequest<K> {
423+
/// Start building a [`FullScanRequest`] with a given `start_time`.
424+
///
425+
/// `start_time` specifies the start time of sync. Chain sources can use this value to set
426+
/// [`TxUpdate::seen_ats`](crate::TxUpdate::seen_ats) for mempool transactions. A transaction
427+
/// without any `seen_ats` is assumed to be unseen in the mempool.
428+
///
429+
/// Use [`FullScanRequest::builder`] to use the current timestamp as `start_time` (this
430+
/// requires `feature = "std`).
431+
pub fn builder_at(start_time: u64) -> FullScanRequestBuilder<K> {
432+
FullScanRequestBuilder {
433+
inner: Self {
434+
start_time,
435+
chain_tip: None,
436+
spks_by_keychain: BTreeMap::new(),
437+
inspect: Box::new(|_, _, _| ()),
438+
},
417439
}
418440
}
419-
}
420441

421-
impl<K: Ord + Clone> FullScanRequest<K> {
422-
/// Start building a [`FullScanRequest`].
442+
/// Start building a [`FullScanRequest`] with the current timestamp as the `start_time`.
443+
///
444+
/// Use [`FullScanRequest::builder_at`] to manually set the `start_time`, or if `feature =
445+
/// "std"` is not available.
446+
#[cfg(feature = "std")]
447+
#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
423448
pub fn builder() -> FullScanRequestBuilder<K> {
424-
FullScanRequestBuilder {
425-
inner: Self::default(),
426-
}
449+
let start_time = std::time::UNIX_EPOCH
450+
.elapsed()
451+
.expect("failed to get current timestamp")
452+
.as_secs();
453+
Self::builder_at(start_time)
454+
}
455+
456+
/// When the full-scan-request was initiated.
457+
pub fn start_time(&self) -> u64 {
458+
self.start_time
427459
}
428460

429461
/// Get the chain tip [`CheckPoint`] of this request (if any).

crates/core/src/tx_update.rs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::collections::{BTreeMap, BTreeSet, HashMap};
1+
use crate::collections::{BTreeMap, BTreeSet, HashSet};
22
use alloc::{sync::Arc, vec::Vec};
33
use bitcoin::{OutPoint, Transaction, TxOut, Txid};
44

@@ -24,16 +24,26 @@ pub struct TxUpdate<A = ()> {
2424
/// Full transactions. These are transactions that were determined to be relevant to the wallet
2525
/// given the request.
2626
pub txs: Vec<Arc<Transaction>>,
27+
2728
/// Floating txouts. These are `TxOut`s that exist but the whole transaction wasn't included in
2829
/// `txs` since only knowing about the output is important. These are often used to help determine
2930
/// the fee of a wallet transaction.
3031
pub txouts: BTreeMap<OutPoint, TxOut>,
32+
3133
/// Transaction anchors. Anchors tells us a position in the chain where a transaction was
3234
/// confirmed.
3335
pub anchors: BTreeSet<(A, Txid)>,
34-
/// Seen at times for transactions. This records when a transaction was most recently seen in
35-
/// the user's mempool for the sake of tie-breaking other conflicting transactions.
36-
pub seen_ats: HashMap<Txid, u64>,
36+
37+
/// When transactions were seen in the mempool.
38+
///
39+
/// An unconfirmed transaction can only be canonical with a `seen_at` value. It is the
40+
/// responsibility of the chain-source to include the `seen_at` values for unconfirmed
41+
/// (unanchored) transactions.
42+
///
43+
/// [`FullScanRequest::start_time`](crate::spk_client::FullScanRequest::start_time) or
44+
/// [`SyncRequest::start_time`](crate::spk_client::SyncRequest::start_time) can be used to
45+
/// provide the `seen_at` value.
46+
pub seen_ats: HashSet<(Txid, u64)>,
3747
}
3848

3949
impl<A> Default for TxUpdate<A> {

0 commit comments

Comments
 (0)