Skip to content

Commit

Permalink
Refactor TransactionDBImpl
Browse files Browse the repository at this point in the history
Summary:
This opens space for the new implementations of TransactionDBImpl such as WritePreparedTxnDBImpl that has a different policy of how to write to DB.
Closes facebook#2689

Differential Revision: D5568918

Pulled By: maysamyabandeh

fbshipit-source-id: f7eac866e175daf3793ae79da108f65cc7dc7b25
  • Loading branch information
Maysam Yabandeh authored and facebook-github-bot committed Aug 6, 2017
1 parent 20dc5e7 commit c9804e0
Show file tree
Hide file tree
Showing 11 changed files with 157 additions and 65 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ set(SOURCES
utilities/transactions/optimistic_transaction_db_impl.cc
utilities/transactions/optimistic_transaction_impl.cc
utilities/transactions/transaction_base.cc
utilities/transactions/transaction_db_impl.cc
utilities/transactions/pessimistic_transaction_db.cc
utilities/transactions/transaction_db_mutex_impl.cc
utilities/transactions/transaction_impl.cc
utilities/transactions/transaction_lock_mgr.cc
Expand Down
3 changes: 2 additions & 1 deletion TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -247,11 +247,12 @@ cpp_library(
"utilities/transactions/optimistic_transaction_db_impl.cc",
"utilities/transactions/optimistic_transaction_impl.cc",
"utilities/transactions/transaction_base.cc",
"utilities/transactions/transaction_db_impl.cc",
"utilities/transactions/pessimistic_transaction_db.cc",
"utilities/transactions/transaction_db_mutex_impl.cc",
"utilities/transactions/transaction_impl.cc",
"utilities/transactions/transaction_lock_mgr.cc",
"utilities/transactions/transaction_util.cc",
"utilities/transactions/write_prepared_transaction_impl.cc",
"utilities/ttl/db_ttl_impl.cc",
"utilities/write_batch_with_index/write_batch_with_index.cc",
"utilities/write_batch_with_index/write_batch_with_index_internal.cc",
Expand Down
12 changes: 12 additions & 0 deletions include/rocksdb/utilities/transaction_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ namespace rocksdb {

class TransactionDBMutexFactory;

enum TxnDBWritePolicy {
WRITE_COMMITTED = 0, // write only the committed data
WRITE_PREPARED, // write data after the prepare phase of 2pc
WRITE_UNPREPARED // write data before the prepare phase of 2pc
};

struct TransactionDBOptions {
// Specifies the maximum number of keys that can be locked at the same time
// per column family.
Expand Down Expand Up @@ -66,6 +72,12 @@ struct TransactionDBOptions {
// condition variable for all transaction locking instead of the default
// mutex/condvar implementation.
std::shared_ptr<TransactionDBMutexFactory> custom_mutex_factory;

// The policy for when to write the data into the DB. The default policy is to
// write only the committed data (WRITE_COMMITTED). The data could be written
// before the commit phase. The DB then needs to provide the mechanisms to
// tell apart committed from uncommitted data.
TxnDBWritePolicy write_policy;
};

struct TransactionOptions {
Expand Down
2 changes: 1 addition & 1 deletion src.mk
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ LIB_SOURCES = \
utilities/transactions/optimistic_transaction_db_impl.cc \
utilities/transactions/optimistic_transaction_impl.cc \
utilities/transactions/transaction_base.cc \
utilities/transactions/transaction_db_impl.cc \
utilities/transactions/pessimistic_transaction_db.cc \
utilities/transactions/transaction_db_mutex_impl.cc \
utilities/transactions/transaction_impl.cc \
utilities/transactions/transaction_lock_mgr.cc \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

#ifndef ROCKSDB_LITE

#include "utilities/transactions/transaction_db_impl.h"
#include "utilities/transactions/pessimistic_transaction_db.h"

#include <string>
#include <unordered_set>
Expand All @@ -21,8 +21,8 @@

namespace rocksdb {

TransactionDBImpl::TransactionDBImpl(DB* db,
const TransactionDBOptions& txn_db_options)
PessimisticTransactionDB::PessimisticTransactionDB(
DB* db, const TransactionDBOptions& txn_db_options)
: TransactionDB(db),
db_impl_(static_cast_with_check<DBImpl, DB>(db)),
txn_db_options_(txn_db_options),
Expand All @@ -34,9 +34,9 @@ TransactionDBImpl::TransactionDBImpl(DB* db,
assert(db_impl_ != nullptr);
}

// Support initiliazing TransactionDBImpl from a stackable db
// Support initiliazing PessimisticTransactionDB from a stackable db
//
// TransactionDBImpl
// PessimisticTransactionDB
// ^ ^
// | |
// | +
Expand All @@ -50,8 +50,8 @@ TransactionDBImpl::TransactionDBImpl(DB* db,
// +
// DB
//
TransactionDBImpl::TransactionDBImpl(StackableDB* db,
const TransactionDBOptions& txn_db_options)
PessimisticTransactionDB::PessimisticTransactionDB(
StackableDB* db, const TransactionDBOptions& txn_db_options)
: TransactionDB(db),
db_impl_(static_cast_with_check<DBImpl, DB>(db->GetRootDB())),
txn_db_options_(txn_db_options),
Expand All @@ -63,13 +63,13 @@ TransactionDBImpl::TransactionDBImpl(StackableDB* db,
assert(db_impl_ != nullptr);
}

TransactionDBImpl::~TransactionDBImpl() {
PessimisticTransactionDB::~PessimisticTransactionDB() {
while (!transactions_.empty()) {
delete transactions_.begin()->second;
}
}

Status TransactionDBImpl::Initialize(
Status PessimisticTransactionDB::Initialize(
const std::vector<size_t>& compaction_enabled_cf_indices,
const std::vector<ColumnFamilyHandle*>& handles) {
for (auto cf_ptr : handles) {
Expand Down Expand Up @@ -121,7 +121,7 @@ Status TransactionDBImpl::Initialize(
return s;
}

Transaction* TransactionDBImpl::BeginTransaction(
Transaction* WriteCommittedTxnDB::BeginTransaction(
const WriteOptions& write_options, const TransactionOptions& txn_options,
Transaction* old_txn) {
if (old_txn != nullptr) {
Expand All @@ -132,7 +132,18 @@ Transaction* TransactionDBImpl::BeginTransaction(
}
}

TransactionDBOptions TransactionDBImpl::ValidateTxnDBOptions(
Transaction* WritePreparedTxnDB::BeginTransaction(
const WriteOptions& write_options, const TransactionOptions& txn_options,
Transaction* old_txn) {
if (old_txn != nullptr) {
ReinitializeTransaction(old_txn, write_options, txn_options);
return old_txn;
} else {
return new WritePreparedTxnImpl(this, write_options, txn_options);
}
}

TransactionDBOptions PessimisticTransactionDB::ValidateTxnDBOptions(
const TransactionDBOptions& txn_db_options) {
TransactionDBOptions validated = txn_db_options;

Expand Down Expand Up @@ -213,8 +224,19 @@ Status TransactionDB::WrapDB(
DB* db, const TransactionDBOptions& txn_db_options,
const std::vector<size_t>& compaction_enabled_cf_indices,
const std::vector<ColumnFamilyHandle*>& handles, TransactionDB** dbptr) {
TransactionDBImpl* txn_db = new TransactionDBImpl(
db, TransactionDBImpl::ValidateTxnDBOptions(txn_db_options));
PessimisticTransactionDB* txn_db;
switch (txn_db_options.write_policy) {
case WRITE_UNPREPARED:
return Status::NotSupported("WRITE_UNPREPARED is not implemented yet");
case WRITE_PREPARED:
txn_db = new WritePreparedTxnDB(
db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options));
break;
case WRITE_COMMITTED:
default:
txn_db = new WriteCommittedTxnDB(
db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options));
}
*dbptr = txn_db;
Status s = txn_db->Initialize(compaction_enabled_cf_indices, handles);
return s;
Expand All @@ -227,20 +249,32 @@ Status TransactionDB::WrapStackableDB(
StackableDB* db, const TransactionDBOptions& txn_db_options,
const std::vector<size_t>& compaction_enabled_cf_indices,
const std::vector<ColumnFamilyHandle*>& handles, TransactionDB** dbptr) {
TransactionDBImpl* txn_db = new TransactionDBImpl(
db, TransactionDBImpl::ValidateTxnDBOptions(txn_db_options));
PessimisticTransactionDB* txn_db;
switch (txn_db_options.write_policy) {
case WRITE_UNPREPARED:
return Status::NotSupported("WRITE_UNPREPARED is not implemented yet");
case WRITE_PREPARED:
txn_db = new WritePreparedTxnDB(
db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options));
break;
case WRITE_COMMITTED:
default:
txn_db = new WriteCommittedTxnDB(
db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options));
}
*dbptr = txn_db;
Status s = txn_db->Initialize(compaction_enabled_cf_indices, handles);
return s;
}

// Let TransactionLockMgr know that this column family exists so it can
// allocate a LockMap for it.
void TransactionDBImpl::AddColumnFamily(const ColumnFamilyHandle* handle) {
void PessimisticTransactionDB::AddColumnFamily(
const ColumnFamilyHandle* handle) {
lock_mgr_.AddColumnFamily(handle->GetID());
}

Status TransactionDBImpl::CreateColumnFamily(
Status PessimisticTransactionDB::CreateColumnFamily(
const ColumnFamilyOptions& options, const std::string& column_family_name,
ColumnFamilyHandle** handle) {
InstrumentedMutexLock l(&column_family_mutex_);
Expand All @@ -255,7 +289,8 @@ Status TransactionDBImpl::CreateColumnFamily(

// Let TransactionLockMgr know that it can deallocate the LockMap for this
// column family.
Status TransactionDBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
Status PessimisticTransactionDB::DropColumnFamily(
ColumnFamilyHandle* column_family) {
InstrumentedMutexLock l(&column_family_mutex_);

Status s = db_->DropColumnFamily(column_family);
Expand All @@ -266,23 +301,24 @@ Status TransactionDBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
return s;
}

Status TransactionDBImpl::TryLock(PessimisticTxn* txn, uint32_t cfh_id,
const std::string& key, bool exclusive) {
Status PessimisticTransactionDB::TryLock(PessimisticTxn* txn, uint32_t cfh_id,
const std::string& key,
bool exclusive) {
return lock_mgr_.TryLock(txn, cfh_id, key, GetEnv(), exclusive);
}

void TransactionDBImpl::UnLock(PessimisticTxn* txn,
const TransactionKeyMap* keys) {
void PessimisticTransactionDB::UnLock(PessimisticTxn* txn,
const TransactionKeyMap* keys) {
lock_mgr_.UnLock(txn, keys, GetEnv());
}

void TransactionDBImpl::UnLock(PessimisticTxn* txn, uint32_t cfh_id,
const std::string& key) {
void PessimisticTransactionDB::UnLock(PessimisticTxn* txn, uint32_t cfh_id,
const std::string& key) {
lock_mgr_.UnLock(txn, cfh_id, key, GetEnv());
}

// Used when wrapping DB write operations in a transaction
Transaction* TransactionDBImpl::BeginInternalTransaction(
Transaction* PessimisticTransactionDB::BeginInternalTransaction(
const WriteOptions& options) {
TransactionOptions txn_options;
Transaction* txn = BeginTransaction(options, txn_options, nullptr);
Expand All @@ -301,9 +337,9 @@ Transaction* TransactionDBImpl::BeginInternalTransaction(
// sort its keys before locking them. This guarantees that TransactionDB write
// methods cannot deadlock with eachother (but still could deadlock with a
// Transaction).
Status TransactionDBImpl::Put(const WriteOptions& options,
ColumnFamilyHandle* column_family,
const Slice& key, const Slice& val) {
Status PessimisticTransactionDB::Put(const WriteOptions& options,
ColumnFamilyHandle* column_family,
const Slice& key, const Slice& val) {
Status s;

Transaction* txn = BeginInternalTransaction(options);
Expand All @@ -322,9 +358,9 @@ Status TransactionDBImpl::Put(const WriteOptions& options,
return s;
}

Status TransactionDBImpl::Delete(const WriteOptions& wopts,
ColumnFamilyHandle* column_family,
const Slice& key) {
Status PessimisticTransactionDB::Delete(const WriteOptions& wopts,
ColumnFamilyHandle* column_family,
const Slice& key) {
Status s;

Transaction* txn = BeginInternalTransaction(wopts);
Expand All @@ -344,9 +380,9 @@ Status TransactionDBImpl::Delete(const WriteOptions& wopts,
return s;
}

Status TransactionDBImpl::Merge(const WriteOptions& options,
ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) {
Status PessimisticTransactionDB::Merge(const WriteOptions& options,
ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) {
Status s;

Transaction* txn = BeginInternalTransaction(options);
Expand All @@ -366,7 +402,8 @@ Status TransactionDBImpl::Merge(const WriteOptions& options,
return s;
}

Status TransactionDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
Status PessimisticTransactionDB::Write(const WriteOptions& opts,
WriteBatch* updates) {
// Need to lock all keys in this batch to prevent write conflicts with
// concurrent transactions.
Transaction* txn = BeginInternalTransaction(opts);
Expand All @@ -385,19 +422,19 @@ Status TransactionDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
return s;
}

void TransactionDBImpl::InsertExpirableTransaction(TransactionID tx_id,
PessimisticTxn* tx) {
void PessimisticTransactionDB::InsertExpirableTransaction(TransactionID tx_id,
PessimisticTxn* tx) {
assert(tx->GetExpirationTime() > 0);
std::lock_guard<std::mutex> lock(map_mutex_);
expirable_transactions_map_.insert({tx_id, tx});
}

void TransactionDBImpl::RemoveExpirableTransaction(TransactionID tx_id) {
void PessimisticTransactionDB::RemoveExpirableTransaction(TransactionID tx_id) {
std::lock_guard<std::mutex> lock(map_mutex_);
expirable_transactions_map_.erase(tx_id);
}

bool TransactionDBImpl::TryStealingExpiredTransactionLocks(
bool PessimisticTransactionDB::TryStealingExpiredTransactionLocks(
TransactionID tx_id) {
std::lock_guard<std::mutex> lock(map_mutex_);

Expand All @@ -409,15 +446,15 @@ bool TransactionDBImpl::TryStealingExpiredTransactionLocks(
return tx.TryStealingLocks();
}

void TransactionDBImpl::ReinitializeTransaction(
void PessimisticTransactionDB::ReinitializeTransaction(
Transaction* txn, const WriteOptions& write_options,
const TransactionOptions& txn_options) {
auto txn_impl = static_cast_with_check<PessimisticTxn, Transaction>(txn);

txn_impl->Reinitialize(this, write_options, txn_options);
}

Transaction* TransactionDBImpl::GetTransactionByName(
Transaction* PessimisticTransactionDB::GetTransactionByName(
const TransactionName& name) {
std::lock_guard<std::mutex> lock(name_map_mutex_);
auto it = transactions_.find(name);
Expand All @@ -428,7 +465,7 @@ Transaction* TransactionDBImpl::GetTransactionByName(
}
}

void TransactionDBImpl::GetAllPreparedTransactions(
void PessimisticTransactionDB::GetAllPreparedTransactions(
std::vector<Transaction*>* transv) {
assert(transv);
transv->clear();
Expand All @@ -440,11 +477,12 @@ void TransactionDBImpl::GetAllPreparedTransactions(
}
}

TransactionLockMgr::LockStatusData TransactionDBImpl::GetLockStatusData() {
TransactionLockMgr::LockStatusData
PessimisticTransactionDB::GetLockStatusData() {
return lock_mgr_.GetLockStatusData();
}

void TransactionDBImpl::RegisterTransaction(Transaction* txn) {
void PessimisticTransactionDB::RegisterTransaction(Transaction* txn) {
assert(txn);
assert(txn->GetName().length() > 0);
assert(GetTransactionByName(txn->GetName()) == nullptr);
Expand All @@ -453,7 +491,7 @@ void TransactionDBImpl::RegisterTransaction(Transaction* txn) {
transactions_[txn->GetName()] = txn;
}

void TransactionDBImpl::UnregisterTransaction(Transaction* txn) {
void PessimisticTransactionDB::UnregisterTransaction(Transaction* txn) {
assert(txn);
std::lock_guard<std::mutex> lock(name_map_mutex_);
auto it = transactions_.find(txn->GetName());
Expand Down
Loading

0 comments on commit c9804e0

Please sign in to comment.