Skip to content

Commit

Permalink
[Cherry Pick] Fix race condition for transaction cache (#20875) (#20882)
Browse files Browse the repository at this point in the history
Bug was (likely) as follows:

      Reader thread               | Writer thread (state sync)

                                    invalidate tickets
      get ticket
      miss cache
      read database
      insert to cache               insert to cache (racing)
      ticket is valid
      panic
                                    write to database

The writer thread must insert to db first so that the db read
cannot find an old value while holding a valid ticket

## Description 

Describe the changes or additions included in this PR.

## Test plan 

How did you test the new or updated feature?

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] gRPC:
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:

Co-authored-by: Eugene Boguslavsky <[email protected]>
  • Loading branch information
mystenmark and ebmifa authored Jan 14, 2025
1 parent e8141a7 commit fea2f27
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 10 deletions.
11 changes: 7 additions & 4 deletions crates/sui-core/src/execution_cache/cache_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::sync::Arc;
use std::{cmp::Ordering, hash::DefaultHasher};

use moka::sync::Cache as MokaCache;
use mysten_common::debug_fatal;
use parking_lot::Mutex;
use sui_types::base_types::SequenceNumber;

Expand Down Expand Up @@ -292,10 +293,12 @@ where
let mut entry = entry.value().lock();
check_ticket()?;

// Ticket expiry makes this assert impossible.
// TODO: relax to debug_assert?
assert!(!entry.is_newer_than(&value), "entry is newer than value");
*entry = value;
// Ticket expiry should make this assert impossible.
if entry.is_newer_than(&value) {
debug_fatal!("entry is newer than value");
} else {
*entry = value;
}
}

Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1339,3 +1339,60 @@ async fn latest_object_cache_race_test() {
checker.join().unwrap();
invalidator.join().unwrap();
}

#[tokio::test]
async fn test_transaction_cache_race() {
telemetry_subscribers::init_for_testing();

let mut s = Scenario::new(None, Arc::new(AtomicU32::new(0))).await;
let cache = s.cache.clone();
let mut txns = Vec::new();

for i in 0..1000 {
let a = i * 4;
s.with_created(&[a]);
s.do_tx().await;

let outputs = s.take_outputs();
let tx = (*outputs.transaction).clone();
let effects = outputs.effects.clone();

txns.push((tx, effects));
}

let barrier = Arc::new(std::sync::Barrier::new(2));

let t1 = {
let txns = txns.clone();
let cache = cache.clone();
let barrier = barrier.clone();
std::thread::spawn(move || {
for (i, (tx, effects)) in txns.into_iter().enumerate() {
barrier.wait();
// test both single and multi insert
if i % 2 == 0 {
cache.insert_transaction_and_effects(&tx, &effects);
} else {
cache.multi_insert_transaction_and_effects(&[VerifiedExecutionData::new(
tx, effects,
)]);
}
}
})
};

let t2 = {
let txns = txns.clone();
let cache = cache.clone();
let barrier = barrier.clone();
std::thread::spawn(move || {
for (tx, _) in txns {
barrier.wait();
cache.get_transaction_block(tx.digest());
}
})
};

t1.join().unwrap();
t2.join().unwrap();
}
12 changes: 6 additions & 6 deletions crates/sui-core/src/execution_cache/writeback_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2268,6 +2268,9 @@ impl StateSyncAPI for WritebackCache {
transaction: &VerifiedTransaction,
transaction_effects: &TransactionEffects,
) {
self.store
.insert_transaction_and_effects(transaction, transaction_effects)
.expect("db error");
self.cached
.transactions
.insert(
Expand All @@ -2284,15 +2287,15 @@ impl StateSyncAPI for WritebackCache {
Ticket::Write,
)
.ok();
self.store
.insert_transaction_and_effects(transaction, transaction_effects)
.expect("db error");
}

fn multi_insert_transaction_and_effects(
&self,
transactions_and_effects: &[VerifiedExecutionData],
) {
self.store
.multi_insert_transaction_and_effects(transactions_and_effects.iter())
.expect("db error");
for VerifiedExecutionData {
transaction,
effects,
Expand All @@ -2315,8 +2318,5 @@ impl StateSyncAPI for WritebackCache {
)
.ok();
}
self.store
.multi_insert_transaction_and_effects(transactions_and_effects.iter())
.expect("db error");
}
}

0 comments on commit fea2f27

Please sign in to comment.