Skip to content

Commit

Permalink
MB-35594: Don't return estimate=0 for dcp-takeover stats before backfill
Browse files Browse the repository at this point in the history
The 'dcp-takeover' stats are used by ns_server to estimate how many
mutations are remaining on a DCP stream. However, the estimate value
is not updated until the backfill task has run once (and scanned the
disk file). As such, if 'dcp-takeover' stats are requested before that
first backfil task has run, then they can incorrectly report '0'
backfill items.

To address this, change backfillRemaining to be of type
boost::optional, initialized to an empty optional.

Only when the backfill scan has completed (when the number of items
remaining is determined) is the optional populated.

Then, when stats are requested use a new status value
"calculating-item-count" if the optional is empty (i.e. before scan).

Change-Id: Id7049a0c13a8aab429f137d2f4b293567e360638
Reviewed-on: http://review.couchbase.org/114894
Tested-by: Build Bot <[email protected]>
Reviewed-by: James Harrison <[email protected]>
Reviewed-by: Ben Huddleston <[email protected]>
  • Loading branch information
daverigby committed Sep 18, 2019
1 parent a9a4c3e commit ec02585
Show file tree
Hide file tree
Showing 9 changed files with 156 additions and 52 deletions.
47 changes: 32 additions & 15 deletions engines/ep/src/dcp/active_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ ActiveStream::ActiveStream(EventuallyPersistentEngine* e,
isBackfillTaskRunning(false),
pendingBackfill(false),
lastReadSeqno(st_seqno),
backfillRemaining(0),
backfillRemaining(),
lastReadSeqnoUnSnapshotted(st_seqno),
lastSentSeqno(st_seqno),
curChkSeqno(st_seqno),
Expand Down Expand Up @@ -477,6 +477,16 @@ void ActiveStream::setVBucketStateAckRecieved() {
notifyStreamReady();
}

void ActiveStream::setBackfillRemaining(size_t value) {
std::lock_guard<std::mutex> guard(streamMutex);
backfillRemaining = value;
}

void ActiveStream::clearBackfillRemaining() {
std::lock_guard<std::mutex> guard(streamMutex);
backfillRemaining.reset();
}

std::unique_ptr<DcpResponse> ActiveStream::backfillPhase(
std::lock_guard<std::mutex>& lh) {
auto resp = nextQueuedItem();
Expand Down Expand Up @@ -505,15 +515,15 @@ std::unique_ptr<DcpResponse> ActiveStream::backfillPhase(
// Only DcpResponse objects representing items from "disk" have a size
// so only update backfillRemaining when non-zero
if (resp->getApproximateSize()) {
if (backfillRemaining.load(std::memory_order_relaxed) > 0) {
backfillRemaining.fetch_sub(1, std::memory_order_relaxed);
Expects(backfillRemaining.is_initialized());
if (*backfillRemaining > 0) {
(*backfillRemaining)--;
}
}
}

if (!isBackfillTaskRunning && readyQ.empty()) {
// Given readyQ.empty() is True resp will be NULL
backfillRemaining.store(0, std::memory_order_relaxed);
// The previous backfill has completed. Check to see if another
// backfill needs to be scheduled.
if (pendingBackfill) {
Expand Down Expand Up @@ -747,20 +757,24 @@ void ActiveStream::addTakeoverStats(const AddStatFn& add_stat,
return;
}

size_t total = backfillRemaining.load(std::memory_order_relaxed);
if (backfillRemaining == 0) {
Expects(!isPending());
}

size_t total = 0;
const char* status = nullptr;
if (isBackfilling()) {
add_casted_stat("status", "backfilling", add_stat, cookie);
if (backfillRemaining) {
status = "backfilling";
total += *backfillRemaining;
} else {
status = "calculating-item-count";
}
} else {
add_casted_stat("status", "in-memory", add_stat, cookie);
status = "in-memory";
}
add_casted_stat("status", status, add_stat, cookie);

if (backfillRemaining) {
add_casted_stat(
"backfillRemaining", *backfillRemaining, add_stat, cookie);
}
add_casted_stat("backfillRemaining",
backfillRemaining.load(std::memory_order_relaxed),
add_stat,
cookie);

size_t vb_items = vb.getNumItems();
size_t chk_items = 0;
Expand Down Expand Up @@ -1422,6 +1436,9 @@ void ActiveStream::scheduleBackfill_UNLOCKED(bool reschedule) {
producer->scheduleBackfillManager(
*vbucket, shared_from_this(), backfillStart, backfillEnd);
isBackfillTaskRunning.store(true);
/// Number of backfill items is unknown until the Backfill task
/// completes the scan phase - reset backfillRemaining counter.
backfillRemaining.reset();
} else {
if (reschedule) {
// Infrequent code path, see comment below.
Expand Down
20 changes: 12 additions & 8 deletions engines/ep/src/dcp/active_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,12 @@ class ActiveStream : public Stream,

void setVBucketStateAckRecieved();

void incrBackfillRemaining(size_t by) {
backfillRemaining.fetch_add(by, std::memory_order_relaxed);
}
/// Set the number of backfill items remaining to the given value.
void setBackfillRemaining(size_t value);

/// Clears the number of backfill items remaining, setting to an empty
/// (unknown) value.
void clearBackfillRemaining();

void markDiskSnapshot(uint64_t startSeqno,
uint64_t endSeqno,
Expand Down Expand Up @@ -352,12 +355,13 @@ class ActiveStream : public Stream,
snapshotted and put onto readyQ */
AtomicMonotonic<uint64_t, ThrowExceptionPolicy> lastReadSeqno;

/* backfillRemaining is a stat recording the amount of
* items remaining to be read from disk. It is an atomic
* because otherwise the function incrBackfillRemaining
* must acquire the streamMutex lock.
/* backfillRemaining is a stat recording the amount of items remaining to
* be read from disk.
* Before the number of items to be backfilled has been determined (disk
* scanned) it is empty.
* Guarded by streamMutex.
*/
std::atomic<size_t> backfillRemaining;
boost::optional<size_t> backfillRemaining;

std::unique_ptr<DcpResponse> backfillPhase(std::lock_guard<std::mutex>& lh);

Expand Down
2 changes: 1 addition & 1 deletion engines/ep/src/dcp/backfill_disk.cc
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ backfill_status_t DCPBackfillDisk::create() {
stream->setDead(status);
transitionState(backfill_state_done);
} else {
stream->incrBackfillRemaining(scanCtx->documentCount);
stream->setBackfillRemaining(scanCtx->documentCount);
stream->markDiskSnapshot(
startSeqno, scanCtx->maxSeqno, scanCtx->highCompletedSeqno);
transitionState(backfill_state_scanning);
Expand Down
4 changes: 2 additions & 2 deletions engines/ep/src/dcp/backfill_memory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,11 @@ backfill_status_t DCPBackfillMemoryBuffered::create() {
remaining count */
while (rangeItr.curr() != rangeItr.end()) {
if (static_cast<uint64_t>((*rangeItr).getBySeqno()) >= startSeqno) {
/* Incr backfill remaining
/* Set backfill remaining
[EPHE TODO]: This will be inaccurate if do not backfill till end
of the iterator
*/
stream->incrBackfillRemaining(rangeItr.count());
stream->setBackfillRemaining(rangeItr.count());

/* Determine the endSeqno of the current snapshot.
We want to send till requested endSeqno, but if that cannot
Expand Down
19 changes: 0 additions & 19 deletions engines/ep/tests/ep_test_apis.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1641,25 +1641,6 @@ void wait_for_stat_to_be_gte(EngineIface* h,
}
}

void wait_for_stat_to_be_lte(EngineIface* h,
const char* stat,
int final,
const char* stat_key,
const time_t max_wait_time_in_secs) {
useconds_t sleepTime = 128;
WaitTimeAccumulator<int> accumulator("to be less than or equal to", stat,
stat_key, final,
max_wait_time_in_secs);
for (;;) {
auto current = get_int_stat(h, stat, stat_key);
if (current <= final) {
break;
}
accumulator.incrementAndAbortIfLimitReached(current, sleepTime);
decayingSleep(&sleepTime);
}
}

void wait_for_expired_items_to_be(EngineIface* h,
int final,
const time_t max_wait_time_in_secs) {
Expand Down
27 changes: 26 additions & 1 deletion engines/ep/tests/ep_test_apis.h
Original file line number Diff line number Diff line change
Expand Up @@ -472,11 +472,14 @@ void wait_for_stat_to_be_gte(EngineIface* h,
int final,
const char* stat_key = NULL,
const time_t max_wait_time_in_secs = 60);

template <typename T>
void wait_for_stat_to_be_lte(EngineIface* h,
const char* stat,
int final,
T final,
const char* stat_key = NULL,
const time_t max_wait_time_in_secs = 60);

void wait_for_expired_items_to_be(EngineIface* h,
int final,
const time_t max_wait_time_in_secs = 60);
Expand Down Expand Up @@ -677,6 +680,28 @@ void wait_for_stat_to_be(EngineIface* h,
}
}

template <typename T>
void wait_for_stat_to_be_lte(EngineIface* h,
const char* stat,
T final,
const char* stat_key,
const time_t max_wait_time_in_secs) {
useconds_t sleepTime = 128;
WaitTimeAccumulator<T> accumulator("to be less than or equal to",
stat,
stat_key,
final,
max_wait_time_in_secs);
for (;;) {
auto current = get_stat<T>(h, stat, stat_key);
if (current <= final) {
break;
}
accumulator.incrementAndAbortIfLimitReached(current, sleepTime);
decayingSleep(&sleepTime);
}
}

/**
* Function that does an exponential wait for a 'val' to reach 'expected'
*
Expand Down
6 changes: 2 additions & 4 deletions engines/ep/tests/ep_testsuite_dcp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -788,10 +788,8 @@ ENGINE_ERROR_CODE TestDcpConsumer::openStreams() {
std::stringstream stats_takeover;
stats_takeover << "dcp-vbtakeover " << ctx.vbucket.get() << " "
<< name.c_str();
wait_for_stat_to_be_lte(h,
"estimate",
static_cast<int>(est),
stats_takeover.str().c_str());
wait_for_stat_to_be_lte(
h, "estimate", est, stats_takeover.str().c_str());
}

if (ctx.flags & DCP_ADD_STREAM_FLAG_DISKONLY) {
Expand Down
2 changes: 1 addition & 1 deletion engines/ep/tests/mock/mock_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class MockActiveStream : public ActiveStream {
return lastReadSeqno;
}

int getNumBackfillItemsRemaining() const {
boost::optional<size_t> getNumBackfillItemsRemaining() const {
return backfillRemaining;
}

Expand Down
81 changes: 80 additions & 1 deletion engines/ep/tests/module_tests/dcp_stream_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,7 @@ TEST_P(StreamTest, BackfillOnly) {

// Check that backfill stats have been updated correctly
EXPECT_EQ(numItems, stream->getNumBackfillItems());
EXPECT_EQ(numItems, stream->getNumBackfillItemsRemaining());
EXPECT_EQ(numItems, *stream->getNumBackfillItemsRemaining());

destroy_dcp_stream();
}
Expand Down Expand Up @@ -1554,6 +1554,85 @@ TEST_P(SingleThreadedActiveStreamTest, DiskSnapshotSendsChkMarker) {
producer->cancelCheckpointCreatorTask();
}

/// Test that disk backfill remaining isn't prematurely zero (before counts
/// read from disk by backfill task).
TEST_P(SingleThreadedActiveStreamTest, DiskBackfillInitializingItemsRemaining) {
auto vb = engine->getVBucket(vbid);
auto& ckptMgr = *vb->checkpointManager;

// Delete initial stream (so we can re-create after items are only available
// from disk.
stream.reset();

// Store 3 items (to check backfill remaining counts).
// Add items, flush it to disk, then clear checkpoint to force backfill.
store_item(vbid, makeStoredDocKey("key1"), "value");
store_item(vbid, makeStoredDocKey("key2"), "value");
store_item(vbid, makeStoredDocKey("key3"), "value");
ckptMgr.createNewCheckpoint();

flushVBucketToDiskIfPersistent(vbid, 3);

bool newCKptCreated;
ASSERT_EQ(3, ckptMgr.removeClosedUnrefCheckpoints(*vb, newCKptCreated));

// Re-create producer now we have items only on disk.
setupProducer();
ASSERT_TRUE(stream->isBackfilling());

// Should report empty itemsRemaining as that would mislead
// ns_server if they asked for stats before the backfill task runs (they
// would think backfill is complete).
EXPECT_FALSE(stream->getNumBackfillItemsRemaining());

bool statusFound = false;
auto checkStatusFn = [&statusFound](const char* key,
const uint16_t klen,
const char* val,
const uint32_t vlen,
gsl::not_null<const void*> cookie) {
if (std::string(key, klen) == "status") {
EXPECT_EQ(std::string(reinterpret_cast<const char*>(cookie.get())),
std::string(val, vlen));
statusFound = true;
}
};

// Should report status == "calculating_item_count" before backfill
// scan has occurred.
stream->addTakeoverStats(checkStatusFn, "calculating-item-count", *vb);
EXPECT_TRUE(statusFound);

// Run the backfill we scheduled when we transitioned to the backfilling
// state. Run the backfill task once to get initial item counts.
auto& bfm = producer->getBFM();
bfm.backfill();
EXPECT_EQ(3, *stream->getNumBackfillItemsRemaining());
// Should report status == "backfilling"
statusFound = false;
stream->addTakeoverStats(checkStatusFn, "backfilling", *vb);
EXPECT_TRUE(statusFound);

// Run again to actually scan (items remaining unchanged).
bfm.backfill();
EXPECT_EQ(3, *stream->getNumBackfillItemsRemaining());
statusFound = false;
stream->addTakeoverStats(checkStatusFn, "backfilling", *vb);
EXPECT_TRUE(statusFound);

// Finally run again to complete backfill (so it is shutdown in a clean
// fashion).
bfm.backfill();

// Consume the items from backfill; should update items remaining.
// Actually need to consume 4 items (snapshot_marker + 3x mutation).
stream->consumeBackfillItems(4);
EXPECT_EQ(0, *stream->getNumBackfillItemsRemaining());
statusFound = false;
stream->addTakeoverStats(checkStatusFn, "in-memory", *vb);
EXPECT_TRUE(statusFound);
}

/*
* MB-31410: In this test I simulate a DcpConsumer that receives messages
* while previous messages have been buffered. This simulates the system
Expand Down

0 comments on commit ec02585

Please sign in to comment.