Skip to content

Commit

Permalink
MB-43622: Fix race between DCP Open and scheduleDcpStep
Browse files Browse the repository at this point in the history
Previously there was a possibility of a race for DCP
connections by the fact that DCP Open would create the handler
but it wouldn't set the state for the connection to DCP until
the method returned. In the mean time the cookie was available
for DCP to try to schedule a step.

This change sets the connection up as a DCP connection as part
of creating the the ConnHandler object. Unfortunately we can't
reserve the cookie as part of the constructor without a massive
change working through all of the unit tests as they don't clean
up the reference by calling release.

To work around the race condition we'll add an extra ref count
to the cookie before calling DCP Open

Change-Id: Ifa9b87b984af6be53934c8100e1a8d584c423c13
Reviewed-on: http://review.couchbase.org/c/kv_engine/+/143492
Tested-by: Build Bot <[email protected]>
Reviewed-by: Dave Rigby <[email protected]>
  • Loading branch information
trondn authored and daverigby committed Jan 19, 2021
1 parent 4df733f commit c4c7499
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 24 deletions.
4 changes: 0 additions & 4 deletions daemon/connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,6 @@ nlohmann::json Connection::toJSON() const {
return ret;
}

void Connection::setDCP(bool enable) {
dcp = enable;
}

void Connection::restartAuthentication() {
if (authenticated && user.domain == cb::sasl::Domain::External) {
externalAuthManager->logoff(user.name);
Expand Down
7 changes: 1 addition & 6 deletions daemon/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -330,11 +330,9 @@ class Connection : public DcpMessageProducersIface {
void setTerminationReason(std::string reason);

bool isDCP() const {
return dcp;
return dcpConnHandlerIface.load() != nullptr;
}

void setDCP(bool enable);

bool isDcpXattrAware() const {
return dcpXattrAware;
}
Expand Down Expand Up @@ -964,9 +962,6 @@ class Connection : public DcpMessageProducersIface {
/// The reason why the session was terminated
std::string terminationReason;

/** Is this connection used by a DCP connection? */
bool dcp = false;

/** Is this DCP channel XAttrAware */
bool dcpXattrAware = false;

Expand Down
2 changes: 1 addition & 1 deletion daemon/cookie.h
Original file line number Diff line number Diff line change
Expand Up @@ -694,7 +694,7 @@ class Cookie : public cb::tracing::Traceable {
/// Previously reserve would lock the connection, but with OOO we
/// might have multiple cookies in flight and needs to be able to
/// lock them independently
uint8_t refcount = 0;
std::atomic<uint8_t> refcount = 0;

/// see isAuthorized/setAuthorized
bool authorized = false;
Expand Down
52 changes: 43 additions & 9 deletions daemon/protocol/mcbp/dcp_open_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,42 @@ void dcp_open_executor(Cookie& cookie) {
auto key = request.getKey();
auto value = request.getValue();

ret = dcpOpen(
cookie,
request.getOpaque(),
payload->getSeqno(),
flags,
{reinterpret_cast<const char*>(key.data()), key.size()},
{reinterpret_cast<const char*>(value.data()),
value.size()});
// MB-43622 There is a race condition in the creation and
// notification of the DCP connections. Initially
// I tried to reserve the cookie from the constructor
// of the ConnHandler, but that caused a ton of problems
// in the unit tests as they didn't explicitly release
// the reference (and trying to clean up all of that was
// a lot of work). In addition to that we could end up
// with a memory allocation failure trying to insert
// the new cookie in the connection array which would
// also make it hard to figure out when to release
// the reference (and the engine is not allowed to call
// release from a workerthread as it tries to reschedule
// the cookie). The workaround for used is to bump
// the refcount before calling DCP Open so that the
// the checks in scheduleDcpStep can see that the ref
// count is correct if we get a notification before
// this thread call reserve.
cookie.incrementRefcount();
try {
ret = dcpOpen(
cookie,
request.getOpaque(),
payload->getSeqno(),
flags,
{reinterpret_cast<const char*>(key.data()), key.size()},
{reinterpret_cast<const char*>(value.data()),
value.size()});
} catch (const std::exception& e) {
LOG_WARNING(
"{}: Received an exception as part DCP Open: {}, "
"disconnect client",
connection.getId(),
e.what());
ret = ENGINE_DISCONNECT;
}
cookie.decrementRefcount();
}
}

Expand All @@ -74,9 +102,15 @@ void dcp_open_executor(Cookie& cookie) {
connection.setDcpDeletedUserXattr(dcpDeletedUserXattr);
connection.setDcpNoValue(dcpNoValue);
connection.setDcpDeleteTimeEnabled(dcpDeleteTimes);
connection.setDCP(true);
connection.disableSaslAuth();

if (!connection.getDcpConnHandlerIface()) {
throw std::logic_error(
"dcp_open_executor(): The underlying engine returned "
"success but did not set up a DCP connection handler "
"interface");
}

// String buffer with max length = total length of all possible contents
std::string logBuffer;

Expand Down
4 changes: 3 additions & 1 deletion engines/ep/src/connhandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ ConnHandler::ConnHandler(EventuallyPersistentEngine& e,
logger = BucketLogger::createBucketLogger(
std::to_string(reinterpret_cast<uintptr_t>(this)));

auto connId = e.getServerApi()->cookie->get_log_info(c).first;
auto* cookie_api = e.getServerApi()->cookie;
cookie_api->setDcpConnHandler(c, this);
auto connId = cookie_api->get_log_info(c).first;
logger->setConnectionId(connId);
}

Expand Down
2 changes: 0 additions & 2 deletions engines/ep/src/ep_engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6195,8 +6195,6 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::dcpOpen(

// Success creating dcp object which has stored the cookie, now reserve it.
reserveCookie(cookie);
setDcpConnHandler(cookie, handler);

return ENGINE_SUCCESS;
}

Expand Down
6 changes: 5 additions & 1 deletion engines/ewouldblock_engine/ewouldblock_engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,9 @@ class BlockMonitorThread : public Couchbase::Thread {
};

/** ewouldblock_engine class */
class EWB_Engine : public EngineIface, public DcpIface {
class EWB_Engine : public EngineIface,
public DcpIface,
public DcpConnHandlerIface {
private:
enum class Cmd {
NONE,
Expand Down Expand Up @@ -1387,6 +1389,8 @@ ENGINE_ERROR_CODE EWB_Engine::open(gsl::not_null<const void*> cookie,
dcp_stream[cookie] =
std::make_pair(false, std::numeric_limits<uint64_t>::max());
}

real_api->cookie->setDcpConnHandler(cookie, this);
return ENGINE_SUCCESS;
}

Expand Down

0 comments on commit c4c7499

Please sign in to comment.