Skip to content

Commit

Permalink
MB-35133: Move SyncWrite warmup to EPBucket
Browse files Browse the repository at this point in the history
We want to re-use the SyncWrite warmup code when performing a
rollback as it's simpler than trying to write code to revert
every possible state to the pre-rollback state. Move the
loadPreparedSyncWrites code to EPBucket so that we can re-use it
for rollback.

Change-Id: I89b66fe36ace1d873a26fd92a840bdcfdef00be4
Reviewed-on: http://review.couchbase.org/113025
Tested-by: Build Bot <[email protected]>
Reviewed-by: Dave Rigby <[email protected]>
  • Loading branch information
BenHuddleston authored and daverigby committed Aug 8, 2019
1 parent a6312a6 commit 5c6b3ba
Show file tree
Hide file tree
Showing 7 changed files with 161 additions and 134 deletions.
126 changes: 126 additions & 0 deletions engines/ep/src/ep_bucket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@

#include "dcp/dcpconnmap.h"

#include <platform/timeutils.h>

#include <gsl.h>

/**
Expand Down Expand Up @@ -1319,6 +1321,130 @@ void EPBucket::rollbackUnpersistedItems(VBucket& vb, int64_t rollbackSeqno) {
}
}

// Perform an in-order scan of the seqno index.
// a) For each Prepared item found, add to a map of outstanding Prepares.
// b) For each Committed (via Mutation or Prepare) item, if there's an
// outstanding Prepare then that prepare has already been Committed,
// hence remove it from the map.
//
// At the end of the scan, all outstanding Prepared items (which did not
// have a Commit persisted to disk) will be registered with the Durability
// Monitor.
void EPBucket::loadPreparedSyncWrites(
folly::SharedMutex::WriteHolder& vbStateLh, VBucket& vb) {
/// Disk load callback for scan.
struct LoadSyncWrites : public StatusCallback<GetValue> {
LoadSyncWrites(EPVBucket& vb) : vb(vb) {
}

void callback(GetValue& val) override {
if (val.item->isPending()) {
// Pending item which was not aborted (deleted). Add to
// outstanding Prepare map.
outstandingPrepares.emplace(val.item->getKey(),
std::move(val.item));
return;
}

if (val.item->isCommitted()) {
// Committed item. _If_ there's an outstanding prepared
// SyncWrite, remove it (as it has already been committed).
outstandingPrepares.erase(val.item->getKey());
return;
}
}

EPVBucket& vb;

/// Map of Document key -> outstanding (not yet Committed / Aborted)
/// prepares.
std::unordered_map<StoredDocKey, std::unique_ptr<Item>>
outstandingPrepares;
};

auto& epVb = dynamic_cast<EPVBucket&>(vb);
const auto start = std::chrono::steady_clock::now();

// @TODO MB-34017: We can optimise this by starting the scan at the
// high_committed_seqno - all earlier prepares would have been committed
// (or were aborted) and only scanning up to the high prepared seqno.
uint64_t startSeqno = 0;

// Get the kvStore. Using the RW store as the rollback code that will call
// this function will modify vbucket_state that will only be reflected in
// RW store. For warmup case, we don't allow writes at this point in time
// anyway.
auto* kvStore = getRWUnderlyingByShard(epVb.getShard()->getId());

auto storageCB = std::make_shared<LoadSyncWrites>(epVb);

// Don't expect to find anything already in the HashTable, so use
// NoLookupCallback.
auto cacheCB = std::make_shared<NoLookupCallback>();

// Use ALL_ITEMS filter for the scan. NO_DELETES is insufficient
// because (committed) SyncDeletes manifest as a prepared_sync_write
// (doc on disk not deleted) followed by a commit_sync_write (which
// *is* marked as deleted as that's the resulting state).
// We need to see that Commit, hence ALL_ITEMS.
const auto docFilter = DocumentFilter::ALL_ITEMS;
const auto valFilter = getValueFilterForCompressionMode();

auto* scanCtx = kvStore->initScanContext(
storageCB, cacheCB, epVb.getId(), startSeqno, docFilter, valFilter);

// Storage problems can lead to a null context, kvstore logs details
if (!scanCtx) {
EP_LOG_CRITICAL(
"EPBucket::loadPreparedSyncWrites: scanCtx is null for {}",
epVb.getId());
return;
}

auto scanResult = kvStore->scan(scanCtx);
Expects(scanResult == scan_success);

kvStore->destroyScanContext(scanCtx);

EP_LOG_DEBUG(
"EPBucket::loadPreparedSyncWrites: Identified {} outstanding "
"prepared SyncWrites for {} in {}",
storageCB->outstandingPrepares.size(),
epVb.getId(),
cb::time2text(std::chrono::steady_clock::now() - start));

// Insert all outstanding Prepares into the VBucket (HashTable &
// DurabilityMonitor).
std::vector<queued_item> prepares;
for (auto& prepare : storageCB->outstandingPrepares) {
prepares.emplace_back(std::move(prepare.second));
}
// Sequence must be sorted by seqno (ascending) for DurabilityMonitor.
std::sort(
prepares.begin(), prepares.end(), [](const auto& a, const auto& b) {
return a->getBySeqno() < b->getBySeqno();
});

// Need the HPS/HCS so the DurabilityMonitor can be fully resumed
auto vbState = kvStore->getVBucketState(epVb.getId());
if (!vbState) {
throw std::logic_error("EPBucket::loadPreparedSyncWrites: processing " +
epVb.getId().to_string() +
", but found no vbucket_state");
}

epVb.loadOutstandingPrepares(vbStateLh, *vbState, std::move(prepares));
}

ValueFilter EPBucket::getValueFilterForCompressionMode() {
auto compressionMode = engine.getCompressionMode();
if (compressionMode != BucketCompressionMode::Off) {
return ValueFilter::VALUES_COMPRESSED;
}

return ValueFilter::VALUES_DECOMPRESSED;
}

void EPBucket::notifyNewSeqno(const Vbid vbid, const VBNotifyCtx& notifyCtx) {
if (notifyCtx.notifyFlusher) {
notifyFlusher(vbid);
Expand Down
9 changes: 9 additions & 0 deletions engines/ep/src/ep_bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,15 @@ class EPBucket : public KVBucket {

void rollbackUnpersistedItems(VBucket& vb, int64_t rollbackSeqno) override;

void loadPreparedSyncWrites(folly::SharedMutex::WriteHolder& vbStateLh,
VBucket& vb) override;

/**
* Returns the ValueFilter to use for KVStore scans, given the bucket
* compression mode.
*/
ValueFilter getValueFilterForCompressionMode();

void notifyNewSeqno(const Vbid vbid, const VBNotifyCtx& notifyCtx) override;

virtual bool isGetAllKeysSupported() const override {
Expand Down
7 changes: 2 additions & 5 deletions engines/ep/src/ep_vb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -743,13 +743,10 @@ MutationStatus EPVBucket::insertFromWarmup(Item& itm,
return ht.insertFromWarmup(itm, eject, keyMetaDataOnly, eviction);
}

void EPVBucket::restoreOutstandingPreparesFromWarmup(
void EPVBucket::loadOutstandingPrepares(
const folly::SharedMutex::WriteHolder& vbStateLock,
const vbucket_state& vbs,
std::vector<queued_item>&& outstandingPrepares) {
// About to change the durabilityMonitor object, which is guarded by
// stateLock.
folly::SharedMutex::WriteHolder wlh(getStateLock());

// First insert all prepares into the HashTable, updating their type
// to PreparedMaybeVisible to ensure that the document cannot be read until
// the Prepare is re-committed.
Expand Down
4 changes: 3 additions & 1 deletion engines/ep/src/ep_vb.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,13 @@ class EPVBucket : public VBucket {
* Populates the HashTable and the DurabilityMonitor with the given
* set of queued_items.
*
* @param vbStateLock read lock on the vBucket state.
* @param vbs The vbucket_state read during warmup
* @param outstandingPrepares Sequence of prepared_sync_writes, sorted by
* seqno in ascending order.
*/
void restoreOutstandingPreparesFromWarmup(
void loadOutstandingPrepares(
const folly::SharedMutex::WriteHolder& vbStateLock,
const vbucket_state& vbs,
std::vector<queued_item>&& outstandingPrepares);

Expand Down
5 changes: 5 additions & 0 deletions engines/ep/src/ephemeral_bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ class EphemeralBucket : public KVBucket {
// No op
}

void loadPreparedSyncWrites(folly::SharedMutex::WriteHolder& vbStateLh,
VBucket& vb) override {
// No op
}

void notifyNewSeqno(const Vbid vbid, const VBNotifyCtx& notifyCtx) override;

/**
Expand Down
9 changes: 9 additions & 0 deletions engines/ep/src/kv_bucket_iface.h
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,15 @@ class KVBucketIface {
virtual void rollbackUnpersistedItems(VBucket& vb,
int64_t rollbackSeqno) = 0;

/**
* Load the prepared SyncWrites from disk for the given vBucket.
*
* @param vbStateLh vBucket state lock
* @param vb vBucket for which we will load SyncWrites
*/
virtual void loadPreparedSyncWrites(
folly::SharedMutex::WriteHolder& vbStateLh, VBucket& vb) = 0;

// During the warmup phase we might want to enable external traffic
// at a given point in time.. The LoadStorageKvPairCallback will be
// triggered whenever we want to check if we could enable traffic..
Expand Down
135 changes: 7 additions & 128 deletions engines/ep/src/warmup.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,6 @@ void logWarmupStats(EPBucket& epstore) {
megabytes_per_seconds);
}

/**
* Returns the ValueFilter to use for KVStore scans, given the bucket
* compression mode.
*/
static ValueFilter getValueFilterForCompressionMode(
const BucketCompressionMode& compressionMode);

//////////////////////////////////////////////////////////////////////////////
// //
// Helper class used to insert data into the epstore //
Expand Down Expand Up @@ -1220,119 +1213,17 @@ void Warmup::scheduleLoadPreparedSyncWrites() {
}

void Warmup::loadPreparedSyncWrites(uint16_t shardId) {
// Perform an in-order scan of the seqno index.
// a) For each Prepared item found, add to a map of outstanding Prepares.
// b) For each Committed (via Mutation or Prepare) item, if there's an
// outstanding Prepare then that prepare has already been Committed,
// hence remove it from the map.
//
// At the end of the scan, all outstanding Prepared items (which did not
// have a Commit persisted to disk) will be registered with the Durability
// Monitor.

/// Disk load callback for scan.
struct LoadSyncWrites : public StatusCallback<GetValue> {
LoadSyncWrites(EPVBucket& vb) : vb(vb) {
}

void callback(GetValue& val) override {
if (val.item->isPending()) {
// Pending item which was not aborted (deleted). Add to
// outstanding Prepare map.
outstandingPrepares.emplace(val.item->getKey(),
std::move(val.item));
return;
}

if (val.item->isCommitted()) {
// Committed item. _If_ there's an outstanding prepared
// SyncWrite, remove it (as it has already been committed).
outstandingPrepares.erase(val.item->getKey());
return;
}
}

EPVBucket& vb;

/// Map of Document key -> outstanding (not yet Committed / Aborted)
/// prepares.
std::unordered_map<StoredDocKey, std::unique_ptr<Item>>
outstandingPrepares;
};

for (const auto vbid : shardVbIds[shardId]) {
const auto start = std::chrono::steady_clock::now();
auto itr = warmedUpVbuckets.find(vbid.get());
if (itr == warmedUpVbuckets.end()) {
continue;
}
auto& epVb = dynamic_cast<EPVBucket&>(*(itr->second));

auto storageCB = std::make_shared<LoadSyncWrites>(epVb);

// Don't expect to find anything already in the HashTable, so use
// NoLookupCallback.
auto cacheCB = std::make_shared<NoLookupCallback>();

// @todo-durability: We can optimise this by starting the scan at the
// high_committed_seqno - all earlier prepares would have been committed
// (or were aborted).
uint64_t startSeqno = 0;

auto* kvStore = store.getROUnderlyingByShard(shardId);
// Use ALL_ITEMS filter for the scan. NO_DELETES is insufficient
// because (committed) SyncDeletes manifest as a prepared_sync_write
// (doc on disk not deleted) followed by a commit_sync_write (which
// *is* marked as deleted as that's the resulting state).
// We need to see that Commit, hence ALL_ITEMS.
const auto docFilter = DocumentFilter::ALL_ITEMS;
const auto valFilter = getValueFilterForCompressionMode(
store.getEPEngine().getCompressionMode());
auto* scanCtx = kvStore->initScanContext(
storageCB, cacheCB, vbid, startSeqno, docFilter, valFilter);

// storage problems can lead to a null context, kvstore logs details
if (!scanCtx) {
EP_LOG_CRITICAL(
"Warmup::loadPreparedSyncWrites: scanCtx is null for {}", vbid);
continue;
}

auto scanResult = kvStore->scan(scanCtx);
Expects(scanResult == scan_success);

kvStore->destroyScanContext(scanCtx);

EP_LOG_DEBUG(
"Warmup::loadPreparedSyncWrites: Identified {} outstanding "
"prepared SyncWrites for {} in {}",
storageCB->outstandingPrepares.size(),
vbid,
cb::time2text(std::chrono::steady_clock::now() - start));

// Insert all outstanding Prepares into the VBucket (HashTable &
// DurabilityMonitor).
std::vector<queued_item> prepares;
for (auto& prepare : storageCB->outstandingPrepares) {
prepares.emplace_back(std::move(prepare.second));
}
// Sequence must be sorted by seqno (ascending) for DurabilityMonitor.
std::sort(prepares.begin(),
prepares.end(),
[](const auto& a, const auto& b) {
return a->getBySeqno() < b->getBySeqno();
});

// Need the HPS/HCS so the DurabilityMonitor can be fully resumed
auto vbState = shardVbStates[shardId].find(vbid);
if (vbState == shardVbStates[shardId].end()) {
throw std::logic_error(
"Warmup::loadPreparedSyncWrites: processing " +
vbid.to_string() + ", but found no vbucket_state");
}
const vbucket_state& vbs = vbState->second;

epVb.restoreOutstandingPreparesFromWarmup(vbs, std::move(prepares));
// Our EPBucket function will do the load for us as we re-use the code
// for rollback.
auto& vb = *(itr->second);
folly::SharedMutex::WriteHolder vbStateLh(vb.getStateLock());
store.loadPreparedSyncWrites(vbStateLh, vb);
}

if (++threadtask_count == store.vbMap.getNumShards()) {
Expand Down Expand Up @@ -1572,16 +1463,6 @@ void Warmup::scheduleLoadingKVPairs()

}

ValueFilter getValueFilterForCompressionMode(
const BucketCompressionMode& compressionMode) {

if (compressionMode != BucketCompressionMode::Off) {
return ValueFilter::VALUES_COMPRESSED;
}

return ValueFilter::VALUES_DECOMPRESSED;
}

void Warmup::loadKVPairsforShard(uint16_t shardId)
{
bool maybe_enable_traffic = false;
Expand All @@ -1597,8 +1478,7 @@ void Warmup::loadKVPairsforShard(uint16_t shardId)
auto cl =
std::make_shared<LoadValueCallback>(store.vbMap, state.getState());

ValueFilter valFilter = getValueFilterForCompressionMode(
store.getEPEngine().getCompressionMode());
ValueFilter valFilter = store.getValueFilterForCompressionMode();

for (const auto vbid : shardVbIds[shardId]) {
ScanContext* ctx = kvstore->initScanContext(cb, cl, vbid, 0,
Expand Down Expand Up @@ -1640,8 +1520,7 @@ void Warmup::loadDataforShard(uint16_t shardId)
auto cl =
std::make_shared<LoadValueCallback>(store.vbMap, state.getState());

ValueFilter valFilter = getValueFilterForCompressionMode(
store.getEPEngine().getCompressionMode());
ValueFilter valFilter = store.getValueFilterForCompressionMode();

for (const auto vbid : shardVbIds[shardId]) {
ScanContext* ctx = kvstore->initScanContext(cb, cl, vbid, 0,
Expand Down

0 comments on commit 5c6b3ba

Please sign in to comment.