Skip to content

Commit fa23e50

Browse files
committed
Improves reorg logic by checking whether we are on sync with the backend or not
The current reorg logic does not really take into account whether we are on sync with the backend or not. On a first block connected after a reorg, it will try to send all reorged out data assuming it has knowledge of everything that is already confirmed or in the mempool. However, in a multi block reorg the backend could be at a height that the tower has not processed yet, hence some transactions may be on the chain but not on our internal `txindex`. Therefore, it could be the case that we try to re-send something that has been reorged-out and see it bounce. Under normal conditions, that would mean the transaction was confirmed a long time ago, since otherwise it would be in our index. However, in this case it may be that it is just confirmed in a subsequent block we haven't processed yet. This will lead to wrongly assuming the tracker was `IRREVOCABLY RESOLVED`, while in reality it may only have a few confirmations. This patch fixes that. In the case of a transaction bouncing we will check whether we are on sync with the backend, and only if so consider the tracker as `IRREVOCABLY RESOLVED`. Otherwise, the tracker will be flagged as `OffSync` and retried until it bounces when we are on sync, or its status is updated on block processing. For context, this edge case was introduced when adding support for prune mode. Before that (when `txindex` for the backend was required) we would have used `getrawtransaction` to check the confirmation count of the bouncing transaction.
1 parent bacac07 commit fa23e50

File tree

4 files changed

+109
-28
lines changed

4 files changed

+109
-28
lines changed

teos/src/carrier.rs

Lines changed: 52 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -104,15 +104,23 @@ impl Carrier {
104104
ConfirmationStatus::Rejected(rpc_errors::RPC_VERIFY_ERROR)
105105
}
106106
rpc_errors::RPC_VERIFY_ALREADY_IN_CHAIN => {
107-
log::info!(
108-
"Transaction was confirmed long ago, not keeping track of it: {}",
109-
tx.txid()
110-
);
111-
112-
// Given we are not using txindex, if a transaction bounces we cannot get its confirmation count. However, [send_transaction] is guarded by
113-
// checking whether the transaction id can be found in the [Responder]'s [TxIndex], meaning that if the transaction bounces it was confirmed long
114-
// ago (> IRREVOCABLY_RESOLVED), so we don't need to worry about it.
115-
ConfirmationStatus::IrrevocablyResolved
107+
if self.bitcoin_cli.get_block_count().unwrap() as u32 > self.block_height {
108+
// We are out of sync, either we are trying to send things to bitcoind after a reorg (and we have not reached the new tip yet)
109+
// or a block was found right when we were trying to send something and we have not yet processed it.
110+
// In both cases we don't know if what we are trying to send is really old (IRREVOCABLY_RESOLVED) or if it'll just be processed
111+
// in our way up to the new tip (we need to make this work both for txindex and prune mode, so getrawtransaction is not an option)
112+
ConfirmationStatus::OffSync
113+
} else {
114+
log::info!(
115+
"Transaction was confirmed long ago, not keeping track of it: {}",
116+
tx.txid()
117+
);
118+
119+
// Given we are not using txindex, if a transaction bounces we cannot get its confirmation count. However, [send_transaction] is guarded by
120+
// checking whether the transaction id can be found in the [Responder]'s [TxIndex], and we know we are on sync, so he transaction
121+
// must have confirmed a long ago (> IRREVOCABLY_RESOLVED). Therefore, we don't need to worry about it anymore.
122+
ConfirmationStatus::IrrevocablyResolved
123+
}
116124
}
117125
rpc_errors::RPC_DESERIALIZATION_ERROR => {
118126
// Adding this here just for completeness. We should never end up here. The Carrier only sends txs handed by the Responder,
@@ -311,12 +319,44 @@ mod tests {
311319

312320
#[test]
313321
fn test_send_transaction_verify_already_in_chain() {
314-
let bitcoind_mock = BitcoindMock::new(MockOptions::with_error(
315-
rpc_errors::RPC_VERIFY_ALREADY_IN_CHAIN as i64,
316-
));
322+
let start_height = START_HEIGHT as u32;
323+
// Set the backend to be one block ahead of us
324+
let bitcoind_mock = BitcoindMock::new(
325+
MockOptions::with_error(rpc_errors::RPC_VERIFY_ALREADY_IN_CHAIN as i64)
326+
.at_height(start_height + 1),
327+
);
317328
let bitcoind_reachable = Arc::new((Mutex::new(true), Condvar::new()));
318329
let bitcoin_cli = Arc::new(BitcoindClient::new(bitcoind_mock.url(), Auth::None).unwrap());
330+
start_server(bitcoind_mock.server);
331+
332+
let mut carrier = Carrier::new(bitcoin_cli, bitcoind_reachable, start_height);
333+
let tx = consensus::deserialize(&Vec::from_hex(TX_HEX).unwrap()).unwrap();
334+
let r = carrier.send_transaction(&tx);
335+
336+
// We are offsync, so the transaction should bounce but tell us about it
337+
assert_eq!(r, ConfirmationStatus::OffSync);
338+
assert_eq!(carrier.issued_receipts.get(&tx.txid()).unwrap(), &r);
339+
340+
// Try again, but this time being onsync, now we should get an IrrevocablyResolved
341+
// We first need to clear the issued_receipts
342+
carrier.issued_receipts.remove(&tx.txid());
343+
// And either increase our height
344+
carrier.block_height += 1;
345+
346+
let r = carrier.send_transaction(&tx);
347+
assert_eq!(r, ConfirmationStatus::IrrevocablyResolved);
348+
assert_eq!(carrier.issued_receipts.get(&tx.txid()).unwrap(), &r);
349+
}
350+
351+
#[test]
352+
fn test_send_transaction_verify_already_in_chain_offsync() {
319353
let start_height = START_HEIGHT as u32;
354+
let bitcoind_mock = BitcoindMock::new(
355+
MockOptions::with_error(rpc_errors::RPC_VERIFY_ALREADY_IN_CHAIN as i64)
356+
.at_height(start_height),
357+
);
358+
let bitcoind_reachable = Arc::new((Mutex::new(true), Condvar::new()));
359+
let bitcoin_cli = Arc::new(BitcoindClient::new(bitcoind_mock.url(), Auth::None).unwrap());
320360
start_server(bitcoind_mock.server);
321361

322362
let mut carrier = Carrier::new(bitcoin_cli, bitcoind_reachable, start_height);

teos/src/responder.rs

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ pub enum ConfirmationStatus {
2929
ConfirmedIn(u32),
3030
InMempoolSince(u32),
3131
IrrevocablyResolved,
32+
OffSync,
3233
Rejected(i32),
3334
ReorgedOut,
3435
}
@@ -70,6 +71,16 @@ impl ConfirmationStatus {
7071
ConfirmationStatus::ConfirmedIn(_) | &ConfirmationStatus::InMempoolSince(_)
7172
)
7273
}
74+
75+
/// Whether the transaction was rejected
76+
pub fn rejected(&self) -> bool {
77+
matches!(self, ConfirmationStatus::Rejected(_))
78+
}
79+
80+
/// Whether the transaction couldn't be processed because we are off sync.
81+
pub fn off_sync(&self) -> bool {
82+
matches!(self, ConfirmationStatus::OffSync)
83+
}
7384
}
7485

7586
/// Minimal data required in memory to keep track of transaction trackers.
@@ -222,7 +233,7 @@ impl Responder {
222233
carrier.send_transaction(&breach.penalty_tx)
223234
};
224235

225-
if status.accepted() {
236+
if status.accepted() || status.off_sync() {
226237
self.add_tracker(uuid, breach, user_id, status);
227238
}
228239

@@ -348,17 +359,19 @@ impl Responder {
348359
) -> HashMap<UUID, (Transaction, Option<Transaction>)> {
349360
let dbm = self.dbm.lock().unwrap();
350361
let mut tx_to_rebroadcast = HashMap::new();
351-
let mut tracker: TransactionTracker;
352362

353363
for (uuid, t) in self.trackers.lock().unwrap().iter() {
354364
if let ConfirmationStatus::InMempoolSince(h) = t.status {
355365
if (height - h) as u8 >= CONFIRMATIONS_BEFORE_RETRY {
356-
tracker = dbm.load_tracker(*uuid).unwrap();
357-
tx_to_rebroadcast.insert(*uuid, (tracker.penalty_tx, None));
366+
tx_to_rebroadcast
367+
.insert(*uuid, (dbm.load_tracker(*uuid).unwrap().penalty_tx, None));
358368
}
359369
} else if let ConfirmationStatus::ReorgedOut = t.status {
360-
tracker = dbm.load_tracker(*uuid).unwrap();
370+
let tracker = dbm.load_tracker(*uuid).unwrap();
361371
tx_to_rebroadcast.insert(*uuid, (tracker.penalty_tx, Some(tracker.dispute_tx)));
372+
} else if t.status.off_sync() {
373+
tx_to_rebroadcast
374+
.insert(*uuid, (dbm.load_tracker(*uuid).unwrap().penalty_tx, None));
362375
}
363376
}
364377

@@ -413,20 +426,26 @@ impl Responder {
413426
if tx_index.contains_key(&dispute_tx.txid())
414427
| carrier.in_mempool(&dispute_tx.txid())
415428
{
416-
// Dispute tx is on chain (or mempool), so we only need to care about the penalty
429+
// Dispute tx is on chain (or mempool), so we only need to care about the penalty.
430+
// We know for a fact that the penalty is not in the index, because otherwise it would have been received
431+
// a confirmation during the processing of the current block (hence it would not have been passed to this method)
417432
carrier.send_transaction(&penalty_tx)
418433
} else {
419434
// Dispute tx has also been reorged out, meaning that both transactions need to be broadcast.
420435
// DISCUSS: For lightning transactions, if the dispute has been reorged the penalty cannot make it to the network.
421436
// If we keep this general, the dispute can simply be a trigger and the penalty doesn't necessarily have to spend from it.
422-
// We'll keel it lightning specific, at least for now.
437+
// We'll keep it lightning specific, at least for now.
423438
let status = carrier.send_transaction(&dispute_tx);
424439
if let ConfirmationStatus::Rejected(e) = status {
425440
log::error!(
426441
"Reorged dispute transaction rejected during rebroadcast: {} (reason: {e})",
427442
dispute_tx.txid()
428443
);
429444
status
445+
} else if status.off_sync() {
446+
// If the dispute bounces because we are off-sync, we want to try again with the whole package. Hence, we leave this as
447+
// reorged.
448+
ConfirmationStatus::ReorgedOut
430449
} else {
431450
// The dispute was accepted, so we can rebroadcast the penalty.
432451
carrier.send_transaction(&penalty_tx)
@@ -435,7 +454,7 @@ impl Responder {
435454
} else {
436455
// The tracker has simply reached CONFIRMATIONS_BEFORE_RETRY missed confirmations.
437456
log::warn!(
438-
"Penalty transaction has missed many confirmations: {}",
457+
"Penalty transaction has missed many confirmations or was sent while we were off-sync with the backend: {}",
439458
penalty_tx.txid()
440459
);
441460
carrier.send_transaction(&penalty_tx)
@@ -444,11 +463,14 @@ impl Responder {
444463
if let ConfirmationStatus::Rejected(_) = status {
445464
rejected.insert(uuid);
446465
} else {
447-
// Update the tracker if it gets accepted. This will also update the height (since when we are counting the tracker
448-
// to have been in mempool), so it resets the wait period instead of trying to rebroadcast every block.
466+
// Update the status if the tracker is not rejected. This will update the height for InMempooolSince, resetting the
467+
// missed confirmation counter, or flag it as OffSync if we happen to not be on sync.
449468
// DISCUSS: We may want to find another approach in the future for the InMempoool transactions.
450469
trackers.get_mut(&uuid).unwrap().status = status;
451-
accepted.insert(uuid, status);
470+
471+
if status.accepted() {
472+
accepted.insert(uuid, status);
473+
}
452474
}
453475
}
454476

teos/src/test_utils.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -538,30 +538,39 @@ pub(crate) struct BitcoindMock {
538538

539539
#[derive(Default)]
540540
pub(crate) struct MockOptions {
541+
height: u32,
541542
error_code: Option<i64>,
542543
in_mempool: bool,
543544
}
544545

545546
impl MockOptions {
546547
pub fn with_error(error_code: i64) -> Self {
547548
Self {
549+
height: 0,
548550
error_code: Some(error_code),
549551
in_mempool: false,
550552
}
551553
}
552554

553555
pub fn in_mempool() -> Self {
554556
Self {
557+
height: 0,
555558
error_code: None,
556559
in_mempool: true,
557560
}
558561
}
562+
563+
pub fn at_height(self, h: u32) -> Self {
564+
Self { height: h, ..self }
565+
}
559566
}
560567

561568
impl BitcoindMock {
562569
pub fn new(options: MockOptions) -> Self {
563570
let mut io = IoHandler::default();
564571

572+
BitcoindMock::add_getblockcount(&mut io, options.height);
573+
565574
if let Some(error) = options.error_code {
566575
io.add_sync_method("error", move |_params: Params| {
567576
Err(JsonRpcError::new(JsonRpcErrorCode::ServerError(error)))
@@ -614,6 +623,12 @@ impl BitcoindMock {
614623
})
615624
}
616625

626+
fn add_getblockcount(io: &mut IoHandler, h: u32) {
627+
io.add_sync_method("getblockcount", move |_params: Params| {
628+
Ok(Value::Number(h.into()))
629+
});
630+
}
631+
617632
pub fn url(&self) -> &str {
618633
&self.url
619634
}

teos/src/watcher.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -700,11 +700,15 @@ impl chain::Listen for Watcher {
700700
for (uuid, breach) in valid_breaches {
701701
log::info!("Notifying Responder and deleting appointment (uuid: {uuid})");
702702

703-
if let ConfirmationStatus::Rejected(_) = self.responder.handle_breach(
704-
uuid,
705-
breach,
706-
self.appointments.lock().unwrap()[&uuid].user_id,
707-
) {
703+
if self
704+
.responder
705+
.handle_breach(
706+
uuid,
707+
breach,
708+
self.appointments.lock().unwrap()[&uuid].user_id,
709+
)
710+
.rejected()
711+
{
708712
appointments_to_delete.insert(uuid);
709713
} else {
710714
delivered_appointments.insert(uuid);

0 commit comments

Comments
 (0)