diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index 9c4ec5cf072..e23ad994ade 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -1118,6 +1118,9 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( FETCH_SHARD_BUFFER_BYTE_LIMIT, 20e6 ); if( randomize && BUGGIFY ) FETCH_SHARD_BUFFER_BYTE_LIMIT = 1; init( FETCH_SHARD_UPDATES_BYTE_LIMIT, 2500000 ); if( randomize && BUGGIFY ) FETCH_SHARD_UPDATES_BYTE_LIMIT = 100; + // Storage Server with Physical Shard + init( SS_GET_DATA_MOVE_ID, false); if ( isSimulated ) SS_GET_DATA_MOVE_ID = SHARD_ENCODE_LOCATION_METADATA; + //Wait Failure init( MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS, 250 ); if( randomize && BUGGIFY ) MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS = 2; init( WAIT_FAILURE_DELAY_LIMIT, 1.0 ); if( randomize && BUGGIFY ) WAIT_FAILURE_DELAY_LIMIT = 5.0; diff --git a/fdbclient/include/fdbclient/ServerKnobs.h b/fdbclient/include/fdbclient/ServerKnobs.h index b46d1ee9e19..4b5cf3d4853 100644 --- a/fdbclient/include/fdbclient/ServerKnobs.h +++ b/fdbclient/include/fdbclient/ServerKnobs.h @@ -1157,6 +1157,9 @@ class SWIFT_CXX_IMMORTAL_SINGLETON_TYPE ServerKnobs : public KnobsImpl bulkLoadTaskState; // set if the data move is a bulk load data move + Optional> dcTeamIds; // map of dcId to teamId DataMoveMetaData() = default; DataMoveMetaData(UID id, Version version, KeyRange range) : id(id), version(version), priority(0), mode(0) { @@ -202,7 +204,8 @@ struct DataMoveMetaData { template void serialize(Ar& ar) { - serializer(ar, id, version, ranges, priority, src, dest, checkpoints, phase, mode, bulkLoadTaskState); + serializer( + ar, id, version, ranges, priority, src, dest, checkpoints, phase, mode, bulkLoadTaskState, dcTeamIds); } }; diff --git a/fdbclient/include/fdbclient/StorageServerShard.h b/fdbclient/include/fdbclient/StorageServerShard.h index cbaf5bb766e..1769f17e556 100644 --- a/fdbclient/include/fdbclient/StorageServerShard.h +++ b/fdbclient/include/fdbclient/StorageServerShard.h @@ -44,18 +44,27 @@ struct StorageServerShard { const uint64_t id, const uint64_t desiredId, ShardState shardState, - Optional moveInShardId) + UID moveInShardId) : range(range), version(version), id(id), desiredId(desiredId), shardState(shardState), moveInShardId(moveInShardId) {} StorageServerShard(KeyRange range, Version version, const uint64_t id, const uint64_t desiredId, - ShardState shardState) - : range(range), version(version), id(id), desiredId(desiredId), shardState(shardState) {} + ShardState shardState, + std::string teamId) + : range(range), version(version), id(id), desiredId(desiredId), shardState(shardState), teamId(teamId) { + if (shardState != NotAssigned) { + ASSERT_ABORT(id != 0UL); + ASSERT_ABORT(desiredId != 0UL); + } + if (shardState == ReadWrite && version != 0) { + ASSERT_ABORT(!teamId.empty()); + } + } static StorageServerShard notAssigned(KeyRange range, Version version = 0) { - return StorageServerShard(range, version, 0, 0, NotAssigned); + return StorageServerShard(range, version, 0, 0, NotAssigned, (std::string) ""); } ShardState getShardState() const { return static_cast(this->shardState); }; @@ -86,7 +95,7 @@ struct StorageServerShard { std::string res = "StorageServerShard: [Range]: " + Traceable::toString(range) + " [Shard ID]: " + format("%016llx", this->id) + " [Version]: " + std::to_string(version) + " [State]: " + getShardStateString() + - " [Desired Shard ID]: " + format("%016llx", this->desiredId); + " [Desired Shard ID]: " + format("%016llx", this->desiredId) + " [ Team ID ]: " + teamId; if (moveInShardId.present()) { res += " [MoveInShard ID]: " + this->moveInShardId.get().toString(); } @@ -95,7 +104,7 @@ struct StorageServerShard { template void serialize(Ar& ar) { - serializer(ar, range, version, id, desiredId, shardState, moveInShardId); + serializer(ar, range, version, id, desiredId, shardState, moveInShardId, teamId); } KeyRange range; @@ -104,6 +113,7 @@ struct StorageServerShard { uint64_t desiredId; // The intended shard ID. int8_t shardState; Optional moveInShardId; // If present, it is the associated MoveInShardMetaData. + std::string teamId; }; #endif diff --git a/fdbserver/MoveKeys.actor.cpp b/fdbserver/MoveKeys.actor.cpp index 7bb3462f0db..77438d13df4 100644 --- a/fdbserver/MoveKeys.actor.cpp +++ b/fdbserver/MoveKeys.actor.cpp @@ -1712,7 +1712,7 @@ ACTOR static Future startMoveShards(Database occ, serverListEntries.push_back(tr.get(serverListKeyFor(servers[s]))); } std::vector> serverListValues = wait(getAll(serverListEntries)); - + state std::unordered_map> dcServers; for (int s = 0; s < serverListValues.size(); s++) { if (!serverListValues[s].present()) { // Attempt to move onto a server that isn't in serverList (removed or never added to the @@ -1721,6 +1721,13 @@ ACTOR static Future startMoveShards(Database occ, // TODO(psm): Mark the data move as 'deleting'. throw move_to_removed_server(); } + auto si = decodeServerListValue(serverListValues[s].get()); + ASSERT(si.id() == servers[s]); + auto it = dcServers.find(si.locality.describeDcId()); + if (it == dcServers.end()) { + dcServers[si.locality.describeDcId()] = std::vector(); + } + dcServers[si.locality.describeDcId()].push_back(si.id().shortString()); } currentKeys = KeyRangeRef(begin, keys.end); @@ -1733,6 +1740,15 @@ ACTOR static Future startMoveShards(Database occ, state Key endKey = old.back().key; currentKeys = KeyRangeRef(currentKeys.begin, endKey); + if (ranges.front() != currentKeys) { + TraceEvent("MoveShardsPartialRange") + .detail("ExpectedRange", ranges.front()) + .detail("ActualRange", currentKeys) + .detail("DataMoveId", dataMoveId) + .detail("RowLimit", SERVER_KNOBS->MOVE_SHARD_KRM_ROW_LIMIT) + .detail("ByteLimit", SERVER_KNOBS->MOVE_SHARD_KRM_BYTE_LIMIT); + } + // Check that enough servers for each shard are in the correct state state RangeResult UIDtoTagMap = wait(tr.getRange(serverTagKeys, CLIENT_KNOBS->TOO_MANY)); ASSERT(!UIDtoTagMap.more && UIDtoTagMap.size() < CLIENT_KNOBS->TOO_MANY); @@ -1806,6 +1822,7 @@ ACTOR static Future startMoveShards(Database occ, TraceEvent( SevWarn, "StartMoveShardsCancelConflictingDataMove", relocationIntervalId) .detail("Range", rangeIntersectKeys) + .detail("CurrentDataMoveRange", ranges[0]) .detail("DataMoveID", dataMoveId.toString()) .detail("ExistingDataMoveID", destId.toString()); wait(cleanUpDataMove(occ, destId, lock, startMoveKeysLock, keys, ddEnabledState)); @@ -1868,6 +1885,20 @@ ACTOR static Future startMoveShards(Database occ, dataMove.ranges.clear(); dataMove.ranges.push_back(KeyRangeRef(keys.begin, currentKeys.end)); dataMove.dest.insert(servers.begin(), servers.end()); + dataMove.dcTeamIds = std::unordered_map(); + for (auto& [dc, serverIds] : dcServers) { + std::sort(serverIds.begin(), serverIds.end()); + std::string teamId; + for (const auto& serverId : serverIds) { + if (teamId.size() == 0) { + teamId = serverId; + } else { + teamId += "," + serverId; + } + } + // Use the concatenated server ids as the team id to avoid conflicts. + dataMove.dcTeamIds.get()[dc] = teamId; + } } if (currentKeys.end == keys.end) { @@ -2355,7 +2386,9 @@ ACTOR static Future finishMoveShards(Database occ, dataMove.bulkLoadTaskState = newBulkLoadTaskState; } wait(deleteCheckpoints(&tr, dataMove.checkpoints, dataMoveId)); - tr.clear(dataMoveKeyFor(dataMoveId)); + // tr.clear(dataMoveKeyFor(dataMoveId)); + dataMove.phase = DataMoveMetaData::Completed; + tr.set(dataMoveKeyFor(dataMoveId), dataMoveValue(dataMove)); TraceEvent(sevDm, "FinishMoveShardsDeleteMetaData", relocationIntervalId) .detail("DataMove", dataMove.toString()); } else if (!bulkLoadTaskState.present()) { diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 744119c8f2f..0b1977f806e 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -401,6 +401,7 @@ struct AddingShard : NonCopyable { Promise readWrite; DataMovementReason reason; SSBulkLoadMetadata ssBulkLoadMetadata; + std::string teamId; // During the Fetching phase, it saves newer mutations whose version is greater or equal to fetchClient's // fetchVersion, while the shard is still busy catching up with fetchClient. It applies these updates after fetching @@ -431,12 +432,13 @@ struct AddingShard : NonCopyable { AddingShard(StorageServer* server, KeyRangeRef const& keys, DataMovementReason reason, - const SSBulkLoadMetadata& ssBulkLoadMetadata); + const SSBulkLoadMetadata& ssBulkLoadMetadata, + std::string teamId); // When fetchKeys "partially completes" (splits an adding shard in two), this is used to construct the left half AddingShard(AddingShard* prev, KeyRange const& keys) : keys(keys), fetchClient(prev->fetchClient), server(prev->server), transferredVersion(prev->transferredVersion), - fetchVersion(prev->fetchVersion), phase(prev->phase), reason(prev->reason), + fetchVersion(prev->fetchVersion), phase(prev->phase), reason(prev->reason), teamId(prev->teamId), ssBulkLoadMetadata(prev->ssBulkLoadMetadata) {} ~AddingShard() { if (!fetchComplete.isSet()) @@ -457,8 +459,14 @@ struct AddingShard : NonCopyable { }; class ShardInfo : public ReferenceCounted, NonCopyable { - ShardInfo(KeyRange keys, std::unique_ptr&& adding, StorageServer* readWrite) - : adding(std::move(adding)), readWrite(readWrite), keys(keys), shardId(0LL), desiredShardId(0LL), version(0) {} + ShardInfo(KeyRange keys, std::unique_ptr&& prevAdding) + : keys(keys), shardId(0LL), desiredShardId(0LL), version(0) { + ASSERT(prevAdding); + adding = std::move(prevAdding); + teamId = adding->teamId; + } + ShardInfo(KeyRange keys) : keys(keys) {} + ShardInfo(KeyRange keys, StorageServer* data) : keys(keys), readWrite(data) { ASSERT(data); } ShardInfo(KeyRange keys, std::shared_ptr moveInShard) : adding(nullptr), readWrite(nullptr), moveInShard(moveInShard), keys(keys), shardId(moveInShard->meta->destShardId()), desiredShardId(moveInShard->meta->destShardId()), @@ -467,24 +475,29 @@ class ShardInfo : public ReferenceCounted, NonCopyable { public: // A shard has 4 mutual exclusive states: adding, moveInShard, readWrite and notAssigned. std::unique_ptr adding; - struct StorageServer* readWrite; + struct StorageServer* readWrite = nullptr; std::shared_ptr moveInShard; // The shard is being moved in via physical-shard-move. KeyRange keys; - uint64_t changeCounter; - uint64_t shardId; - uint64_t desiredShardId; - Version version; + uint64_t changeCounter = 0UL; + uint64_t shardId = 0UL; + uint64_t desiredShardId = 0UL; + UID dataMoveId; + std::string teamId = ""; + Version version = 0; - static ShardInfo* newNotAssigned(KeyRange keys) { return new ShardInfo(keys, nullptr, nullptr); } - static ShardInfo* newReadWrite(KeyRange keys, StorageServer* data) { return new ShardInfo(keys, nullptr, data); } + static ShardInfo* newNotAssigned(KeyRange keys) { return new ShardInfo(keys); } + static ShardInfo* newReadWrite(KeyRange keys, StorageServer* data) { return new ShardInfo(keys, data); } static ShardInfo* newAdding(StorageServer* data, KeyRange keys, DataMovementReason reason, - const SSBulkLoadMetadata& ssBulkLoadMetadata) { - return new ShardInfo(keys, std::make_unique(data, keys, reason, ssBulkLoadMetadata), nullptr); + const SSBulkLoadMetadata& ssBulkLoadMetadata, + std::string teamId) { + return new ShardInfo(keys, std::make_unique(data, keys, reason, ssBulkLoadMetadata, teamId)); } static ShardInfo* addingSplitLeft(KeyRange keys, AddingShard* oldShard) { - return new ShardInfo(keys, std::make_unique(oldShard, keys), nullptr); + auto shardInfo = new ShardInfo(keys, std::make_unique(oldShard, keys)); + shardInfo->teamId = oldShard->teamId; + return shardInfo; } static ShardInfo* newShard(StorageServer* data, const StorageServerShard& shard); @@ -502,7 +515,7 @@ class ShardInfo : public ReferenceCounted, NonCopyable { StorageServerShard toStorageServerShard() const { StorageServerShard::ShardState st = StorageServerShard::NotAssigned; - Optional moveInShardId; + UID moveInShardId = anonymousShardId; if (this->isReadable()) { st = StorageServerShard::ReadWrite; } else if (!this->assigned()) { @@ -527,7 +540,24 @@ class ShardInfo : public ReferenceCounted, NonCopyable { moveInShardId = this->moveInShard->id(); } } - return StorageServerShard(this->keys, this->version, this->shardId, this->desiredShardId, st, moveInShardId); + + StorageServerShard ssShard; + if (moveInShardId != anonymousShardId) { + ssShard = + StorageServerShard(this->keys, this->version, this->shardId, this->desiredShardId, st, moveInShardId); + ssShard.teamId = teamId; + } else { + ssShard = StorageServerShard(this->keys, this->version, this->shardId, this->desiredShardId, st, teamId); + } + if (isReadable()) { + ASSERT(!ssShard.teamId.empty()); + } + + if (st != StorageServerShard::NotAssigned) { + ASSERT(ssShard.id != 0UL); + ASSERT(ssShard.desiredId != 0UL); + } + return ssShard; } // Copies necessary information from `shard`. @@ -535,6 +565,7 @@ class ShardInfo : public ReferenceCounted, NonCopyable { this->version = shard.version; this->shardId = shard.id; this->desiredShardId = shard.desiredId; + this->teamId = shard.teamId; } // Returns true if the current shard is merged with `other`. @@ -547,6 +578,13 @@ class ShardInfo : public ReferenceCounted, NonCopyable { return true; } + void setTeamId(const std::string& id) { + teamId = id; + if (adding != nullptr) { + adding->teamId = teamId; + } + } + void validate() const { // TODO: Complete this. } @@ -572,6 +610,16 @@ class ShardInfo : public ReferenceCounted, NonCopyable { (moveInShard && moveInShard->fetchComplete.isSet()); } + bool isFetching() const { + if (adding) { + return !adding->fetchComplete.isSet(); + } + if (moveInShard) { + return !moveInShard->fetchComplete.isSet(); + } + return false; + } + std::string debugDescribeState() const { if (notAssigned()) { return "NotAssigned"; @@ -911,97 +959,6 @@ struct BusiestWriteTagContext { busiestWriteTagEventHolder(makeReference(busiestWriteTagTrackingKey)), lastUpdateTime(-1) {} }; -// A SSPhysicalShard represents a physical shard, it contains a list of keyranges. -class SSPhysicalShard { -public: - SSPhysicalShard(const int64_t id) : id(id) {} - - void addRange(Reference shard); - - // Remove the shard if a shard to the same pointer (ShardInfo*) exists. - void removeRange(Reference shard); - - // Clear all shards overlapping with `range`. - void removeRange(KeyRangeRef range); - - bool supportCheckpoint() const; - - bool hasRange(Reference shard) const; - - int size() const { return ranges.size(); } - // Public function to iterate over the ranges - std::vector>::const_iterator begin() const { return ranges.begin(); } - - std::vector>::const_iterator end() const { return ranges.end(); } - -private: - const int64_t id; - std::vector> ranges; -}; - -void SSPhysicalShard::addRange(Reference shard) { - TraceEvent(SevVerbose, "SSPhysicalShardAddShard") - .detail("ShardID", format("%016llx", this->id)) - .detail("Assigned", !shard->notAssigned()) - .detail("Range", shard->keys); - ASSERT(!shard->notAssigned()); - - removeRange(shard->keys); - - ranges.push_back(shard); -} - -void SSPhysicalShard::removeRange(Reference shard) { - TraceEvent(SevVerbose, "SSPhysicalShardRemoveShard") - .detail("ShardID", format("%016llx", this->id)) - .detail("Assigned", !shard->notAssigned()) - .detail("Range", shard->keys); - - for (int i = 0; i < this->ranges.size(); ++i) { - const auto& r = this->ranges[i]; - if (r.getPtr() == shard.getPtr()) { - this->ranges[i] = this->ranges.back(); - this->ranges.pop_back(); - return; - } - } -} - -void SSPhysicalShard::removeRange(KeyRangeRef range) { - TraceEvent(SevVerbose, "SSPhysicalShardRemoveRange") - .detail("ShardID", format("%016llx", this->id)) - .detail("Range", range); - for (int i = 0; i < this->ranges.size();) { - const auto& r = this->ranges[i]; - if (r->keys.intersects(range)) { - this->ranges[i] = this->ranges.back(); - this->ranges.pop_back(); - } else { - ++i; - } - } -} - -bool SSPhysicalShard::supportCheckpoint() const { - for (const auto& r : this->ranges) { - ASSERT(r->desiredShardId == this->id); - if (r->shardId != this->id) { - return false; - } - } - return true; -} - -bool SSPhysicalShard::hasRange(Reference shard) const { - for (int i = 0; i < this->ranges.size(); ++i) { - if (this->ranges[i].getPtr() == shard.getPtr()) { - return true; - } - } - - return false; -} - struct TenantSSInfo { constexpr static FileIdentifier file_identifier = 3253114; TenantAPI::TenantLockState lockState; @@ -1065,7 +1022,8 @@ struct StorageServer : public IStorageMetricsService { public: struct PendingNewShard { - PendingNewShard(uint64_t shardId, KeyRangeRef range) : shardId(format("%016llx", shardId)), range(range) {} + PendingNewShard(uint64_t shardId, UID dataMoveId, KeyRangeRef range, Reference shardInfo) + : shardId(format("%016llx", shardId)), dataMoveId(dataMoveId), range(range), shardInfo(shardInfo) {} std::string toString() const { return fmt::format("PendingNewShard: [ShardID]: {} [Range]: {}", @@ -1074,21 +1032,21 @@ struct StorageServer : public IStorageMetricsService { } std::string shardId; + UID dataMoveId; KeyRange range; + Reference shardInfo; }; std::map> pendingCheckpoints; // Pending checkpoint requests std::unordered_map checkpoints; // Existing and deleting checkpoints std::unordered_map liveCheckpointReaders; // Active checkpoint readers VersionedMap tenantMap; - std::map> - pendingAddRanges; // Pending requests to add ranges to physical shards - std::map> - pendingRemoveRanges; // Pending requests to remove ranges from physical shards std::deque, Standalone>> constructedData; bool shardAware; // True if the storage server is aware of the physical shards. + LocalityData locality; + // Histograms struct FetchKeysHistograms { const Reference latency; @@ -1318,7 +1276,6 @@ struct StorageServer : public IStorageMetricsService { KeyRangeMap> shards; KeyRangeMap ssBulkLoadMetadataMap; // store the latest bulkload task on ranges - std::unordered_map physicalShards; uint64_t shardChangeCounter; // max( shards->changecounter ) KeyRangeMap cachedRangeMap; // indicates if a key-range is being cached @@ -1364,6 +1321,11 @@ struct StorageServer : public IStorageMetricsService { int8_t primaryLocality; NotifiedVersion knownCommittedVersion; + std::map> + pendingAddRanges; // Pending requests to add ranges to physical shards + std::map> + pendingRemoveRanges; // Pending requests to remove ranges from physical shards + Deque> recoveryVersionSkips; int64_t versionLag; // An estimate for how many versions it takes for the data to move from the logs to this storage // server @@ -1711,9 +1673,10 @@ struct StorageServer : public IStorageMetricsService { Reference const> const& db, StorageServerInterface const& ssi, Reference encryptionMonitor) - : shardAware(false), tlogCursorReadsLatencyHistogram(Histogram::getHistogram(STORAGESERVER_HISTOGRAM_GROUP, - TLOG_CURSOR_READS_LATENCY_HISTOGRAM, - Histogram::Unit::milliseconds)), + : shardAware(false), locality(ssi.locality), + tlogCursorReadsLatencyHistogram(Histogram::getHistogram(STORAGESERVER_HISTOGRAM_GROUP, + TLOG_CURSOR_READS_LATENCY_HISTOGRAM, + Histogram::Unit::milliseconds)), ssVersionLockLatencyHistogram(Histogram::getHistogram(STORAGESERVER_HISTOGRAM_GROUP, SS_VERSION_LOCK_LATENCY_HISTOGRAM, Histogram::Unit::milliseconds)), @@ -1802,25 +1765,6 @@ struct StorageServer : public IStorageMetricsService { } //~StorageServer() { fclose(log); } - void addRangeToPhysicalShard(Reference newRange) { - if (!shardAware || newRange->notAssigned()) { - return; - } - - auto [it, ignored] = - physicalShards.insert(std::make_pair(newRange->desiredShardId, SSPhysicalShard(newRange->desiredShardId))); - it->second.addRange(newRange); - } - - void removeRangeFromPhysicalShard(Reference range) { - if (!range.isValid() || !shardAware || range->notAssigned()) { - return; - } - - auto it = physicalShards.find(range->desiredShardId); - ASSERT(it != physicalShards.end()); - it->second.removeRange(range); - } // Puts the given shard into shards. The caller is responsible for adding shards // for all ranges in shards.getAffectedRangesAfterInsertion(newShard->keys)), because these @@ -1843,14 +1787,25 @@ struct StorageServer : public IStorageMetricsService { .detail("ShardID", format("%016llx", it->value()->desiredShardId)) .detail("NewShardID", format("%016llx", newShard->desiredShardId)) .detail("NewShardActualID", format("%016llx", newShard->shardId)); - removeRangeFromPhysicalShard(it->value()); } } } + if (shardAware && newShard->assigned()) { + ASSERT_ABORT(newShard->shardId != 0LL); + ASSERT(newShard->desiredShardId != 0LL); + if (newShard->isReadable() && newShard->version != initialClusterVersion - 1) { + if (newShard->teamId.empty()) { + TraceEvent("TeamIdMissing") + .detail("ShardId", format("%016llx", newShard->shardId)) + .detail("TeamId", newShard->teamId) + .detail("Version", newShard->version); + } + ASSERT_ABORT(!newShard->teamId.empty()); + } + } Reference rShard(newShard); shards.insert(newShard->keys, rShard); - addRangeToPhysicalShard(rShard); } void addMutation(Version version, bool fromFetch, @@ -2210,11 +2165,6 @@ void validate(StorageServer* data, bool force = false) { ASSERT(!s->value()->keys.empty()); if (data->shardAware) { s->value()->validate(); - if (!s->value()->notAssigned()) { - auto it = data->physicalShards.find(s->value()->desiredShardId); - ASSERT(it != data->physicalShards.end()); - ASSERT(it->second.hasRange(s->value())); - } } } @@ -2266,7 +2216,7 @@ void validate(StorageServer* data, bool force = false) { } if (shard->assigned() && data->shardAware) { - TraceEvent(SevVerbose, "ValidateAssignedShard", data->thisServerID) + TraceEvent(SevDebug, "ValidateAssignedShard", data->thisServerID) .detail("Range", shard->keys) .detailf("ShardID", "%016llx", shard->shardId) .detailf("DesiredShardID", "%016llx", shard->desiredShardId) @@ -7701,33 +7651,6 @@ void setAssignedStatus(StorageServer* self, KeyRangeRef keys, bool nowAssigned); void updateStorageShard(StorageServer* self, StorageServerShard shard); void setRangeBasedBulkLoadStatus(StorageServer* self, KeyRangeRef keys, const SSBulkLoadMetadata& ssBulkLoadMetadata); -void coalescePhysicalShards(StorageServer* data, KeyRangeRef keys) { - auto shardRanges = data->shards.intersectingRanges(keys); - auto fullRange = data->shards.ranges(); - - auto iter = shardRanges.begin(); - if (iter != fullRange.begin()) { - --iter; - } - auto iterEnd = shardRanges.end(); - if (iterEnd != fullRange.end()) { - ++iterEnd; - } - - KeyRangeMap>::iterator lastShard = iter; - ++iter; - - for (; iter != iterEnd; ++iter) { - if (ShardInfo::canMerge(lastShard.value().getPtr(), iter->value().getPtr())) { - ShardInfo* newShard = lastShard.value().extractPtr(); - ASSERT(newShard->mergeWith(iter->value().getPtr())); - data->addShard(newShard); - iter = data->shards.rangeContaining(newShard->keys.begin); - } - lastShard = iter; - } -} - void coalesceShards(StorageServer* data, KeyRangeRef keys) { auto shardRanges = data->shards.intersectingRanges(keys); auto fullRange = data->shards.ranges(); @@ -9030,7 +8953,8 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { .detail("DataMoveId", dataMoveId.toString()) .detail("Range", keys) .detail("Phase", "Begin") - .detail("ConcurrentTasks", data->bulkLoadMetrics->getOngoingTasks()); + .detail("ConcurrentTasks", data->bulkLoadMetrics->getOngoingTasks()) + .detail("FKID", fetchKeysID); data->bulkLoadMetrics->addTask(); } @@ -9086,19 +9010,36 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { for (auto r = ndvr.begin(); r != ndvr.end(); ++r) lastAvailable = std::max(lastAvailable, r->value()); + TraceEvent(SevDebug, "BeforeDurableVersion").detail("FKID", fetchKeysID); if (lastAvailable != invalidVersion && lastAvailable >= data->durableVersion.get()) { CODE_PROBE(true, "FetchKeys waits for previous available version to be durable"); wait(data->durableVersion.whenAtLeast(lastAvailable + 1)); } TraceEvent(SevDebug, "FetchKeysVersionSatisfied", data->thisServerID) - .detail("FKID", interval.pairID) + .detail("FKID", fetchKeysID) .detail("DataMoveId", dataMoveId) .detail("ConductBulkLoad", conductBulkLoad); + if (SERVER_KNOBS->SS_GET_DATA_MOVE_ID && data->shardAware) { + // Empty team id indicates the data move will be cancelled soon. + if (shard->teamId == "" || shard->teamId == "InvalidTeam") { + TraceEvent(SevWarnAlways, "TeamIdUnavailable") + .detail("FKID", fetchKeysID) + .detail("DurableVersion", data->durableVersion.get()) + .detail("Version", data->version.get()) + .detail("DataMoveId", dataMoveId); + loop { + wait(delay(30)); + TraceEvent(SevWarnAlways, "FKPendingCancellation").detail("FKID", fetchKeysID); + } + } + } + TraceEvent(SevDebug, "BeforeFKLock").detail("FKID", fetchKeysID); wait(data->fetchKeysParallelismLock.take(TaskPriority::DefaultYield)); state FlowLock::Releaser holdingFKPL(data->fetchKeysParallelismLock); + TraceEvent(SevDebug, "FKLockTaken").detail("FKID", fetchKeysID); state double executeStart = now(); ++data->counters.fetchWaitingCount; data->counters.fetchWaitingMS += 1000 * (executeStart - startTime); @@ -9181,14 +9122,14 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { // Note that error in getting GRV doesn't affect any storage server state. Therefore, we catch // all errors here without failing the storage server. When error happens, fetchVersion fall // back to the above computed fetchVersion. - TraceEvent(SevWarn, "FetchKeyGRVError", data->thisServerID).error(e); + TraceEvent(SevWarn, "FetchKeyGRVError", data->thisServerID).error(e).detail("FKID", fetchKeysID); lastError = e; } } ASSERT(fetchVersion >= shard->fetchVersion); // at this point, shard->fetchVersion is the last fetchVersion shard->fetchVersion = fetchVersion; TraceEvent(SevVerbose, "FetchKeysUnblocked", data->thisServerID) - .detail("FKID", interval.pairID) + .detail("FKID", fetchKeysID) .detail("Version", fetchVersion); while (!shard->updates.empty() && shard->updates[0].version <= fetchVersion) @@ -9230,7 +9171,8 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { TraceEvent(bulkLoadVerboseEventSev(), "SSBulkLoadTaskFetchKey", data->thisServerID) .detail("DataMoveId", dataMoveId.toString()) .detail("Range", keys) - .detail("Phase", "Got task metadata"); + .detail("Phase", "Got task metadata") + .detail("FKID", fetchKeysID); // Check the correctness: bulkLoadTaskMetadata stored in dataMoveMetadata must have the same // dataMoveId. ASSERT(bulkLoadTaskState.getDataMoveId() == dataMoveId); @@ -9243,14 +9185,16 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { .detail("Range", keys) .detail("Knobs", SERVER_KNOBS->BULK_LOAD_USE_SST_INGEST) .detail("SupportsSstIngestion", data->storage.getKeyValueStore()->supportsSstIngestion()) - .detail("Phase", "File download"); + .detail("Phase", "File download") + .detail("FKID", fetchKeysID); // Attempt SST ingestion... if (SERVER_KNOBS->BULK_LOAD_USE_SST_INGEST && data->storage.getKeyValueStore()->supportsSstIngestion()) { TraceEvent(bulkLoadVerboseEventSev(), "SSBulkLoadTaskFetchKey", data->thisServerID) .detail("DataMoveId", dataMoveId.toString()) .detail("Range", keys) - .detail("Phase", "SST ingestion"); + .detail("Phase", "SST ingestion") + .detail("FKID", fetchKeysID); // Verify ranges... for (const auto& [range, fileSet] : *localBulkLoadFileSets) { ASSERT(keys.contains(range)); @@ -9309,7 +9253,7 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { (int)this_block.expectedSize() + (8 - (int)sizeof(KeyValueRef)) * this_block.size(); TraceEvent(SevDebug, "FetchKeysBlock", data->thisServerID) - .detail("FKID", interval.pairID) + .detail("FKID", fetchKeysID) .detail("BlockRows", this_block.size()) .detail("BlockBytes", expectedBlockSize) .detail("KeyBegin", keys.begin) @@ -9337,13 +9281,14 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { if (shard->reason != DataMovementReason::INVALID && priority < SERVER_KNOBS->FETCH_KEYS_THROTTLE_PRIORITY_THRESHOLD && !data->fetchKeysLimiter.ready().isReady()) { - TraceEvent(SevDebug, "FetchKeysThrottling", data->thisServerID); + TraceEvent(SevDebug, "FetchKeysThrottling", data->thisServerID).detail("FKID", fetchKeysID); state double ts = now(); wait(data->fetchKeysLimiter.ready()); TraceEvent(SevDebug, "FetchKeysThrottled", data->thisServerID) .detail("Priority", priority) .detail("KeyRange", shard->keys) - .detail("Delay", now() - ts); + .detail("Delay", now() - ts) + .detail("FKID", fetchKeysID); } // Write this_block to storage @@ -9355,7 +9300,7 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { if (conductBulkLoad) { TraceEvent(bulkLoadVerboseEventSev(), "SSBulkLoadTaskFetchKey", data->thisServerID) - .detail("FKID", interval.pairID) + .detail("FKID", fetchKeysID) .detail("DataMoveId", dataMoveId.toString()) .detail("Range", keys) .detail("BlockRange", blockRange) @@ -9399,14 +9344,14 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { TraceEvent("FKBlockFail", data->thisServerID) .errorUnsuppressed(lastError) .suppressFor(1.0) - .detail("FKID", interval.pairID); + .detail("FKID", fetchKeysID); debug_getRangeRetries++; if (debug_nextRetryToLog == debug_getRangeRetries) { debug_nextRetryToLog += std::min(debug_nextRetryToLog, 1024); TraceEvent(SevWarn, "FetchPast", data->thisServerID) .detail("TotalAttempts", debug_getRangeRetries) - .detail("FKID", interval.pairID) + .detail("FKID", fetchKeysID) .detail("N", fetchVersion) .detail("E", data->version.get()); } @@ -9423,16 +9368,22 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { StorageServerShard rightShard = data->shards[keys.begin]->toStorageServerShard(); rightShard.range = KeyRangeRef(blockBegin, keys.end); auto* leftShard = ShardInfo::addingSplitLeft(KeyRangeRef(keys.begin, blockBegin), shard); + ASSERT(!leftShard->teamId.empty()); leftShard->populateShard(rightShard); + ASSERT(!rightShard.teamId.empty()); + ASSERT(!leftShard->teamId.empty()); shard->server->addShard(leftShard); shard->server->addShard(ShardInfo::newShard(data, rightShard)); } else { shard->server->addShard(ShardInfo::addingSplitLeft(KeyRangeRef(keys.begin, blockBegin), shard)); - shard->server->addShard(ShardInfo::newAdding( - data, KeyRangeRef(blockBegin, keys.end), shard->reason, shard->getSSBulkLoadMetadata())); + shard->server->addShard(ShardInfo::newAdding(data, + KeyRangeRef(blockBegin, keys.end), + shard->reason, + shard->getSSBulkLoadMetadata(), + "")); if (conductBulkLoad) { TraceEvent(bulkLoadVerboseEventSev(), "SSBulkLoadTaskFetchKey", data->thisServerID) - .detail("FKID", interval.pairID) + .detail("FKID", fetchKeysID) .detail("DataMoveId", dataMoveId.toString()) .detail("Range", keys) .detail("NewSplitBeginKey", blockBegin) @@ -9442,6 +9393,7 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { shard = data->shards.rangeContaining(keys.begin).value()->adding.get(); warningLogger = logFetchKeysWarning(shard); AddingShard* otherShard = data->shards.rangeContaining(blockBegin).value()->adding.get(); + ASSERT(!data->shardAware || !otherShard->teamId.empty()); keys = shard->keys; // Split our prior updates. The ones that apply to our new, restricted key range will go back @@ -9454,6 +9406,12 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { splitMutations(data, data->shards, *u); } + TraceEvent(SevDebug, "FetchKeysSplit") + .detail("ExpectedRange", keys) + .detail("NewBlockBegin", blockBegin) + .detail("DataMoveId", dataMoveId) + .detail("FKID", fetchKeysID); + CODE_PROBE(true, "fetchkeys has more"); CODE_PROBE(shard->updates.size(), "Shard has updates"); ASSERT(otherShard->updates.empty()); @@ -9482,10 +9440,11 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { TraceEvent(SevInfo, "FetchKeysStats", data->thisServerID) .detail("TotalBytes", totalBytes) .detail("Duration", duration) - .detail("Rate", static_cast(totalBytes) / duration); + .detail("Rate", static_cast(totalBytes) / duration) + .detail("FKID", fetchKeysID); TraceEvent(SevDebug, "FKBeforeFinalCommit", data->thisServerID) - .detail("FKID", interval.pairID) + .detail("FKID", fetchKeysID) .detail("SV", data->storageVersion()) .detail("DV", data->durableVersion.get()); // Directly commit()ing the IKVS would interfere with updateStorage, possibly resulting in an incomplete @@ -9511,7 +9470,7 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { state std::unordered_map feedFetchedVersions = wait(feedFetchMain); TraceEvent(SevDebug, "FKAfterFinalCommit", data->thisServerID) - .detail("FKID", interval.pairID) + .detail("FKID", fetchKeysID) .detail("SV", data->storageVersion()) .detail("DV", data->durableVersion.get()); @@ -9523,7 +9482,7 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { // After we add to the promise readyFetchKeys, update() would provide a pointer to FetchInjectionInfo that // we can put mutation in. FetchInjectionInfo* batch = wait(p.getFuture()); - TraceEvent(SevDebug, "FKUpdateBatch", data->thisServerID).detail("FKID", interval.pairID); + TraceEvent(SevDebug, "FKUpdateBatch", data->thisServerID).detail("FKID", fetchKeysID); shard->phase = AddingShard::FetchingCF; ASSERT(data->version.get() >= fetchVersion); @@ -9571,7 +9530,7 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { readOptions); TraceEvent(SevDebug, "FetchKeysHaveData", data->thisServerID) - .detail("FKID", interval.pairID) + .detail("FKID", fetchKeysID) .detail("Version", shard->transferredVersion) .detail("StorageVersion", data->storageVersion()); validate(data); @@ -9628,7 +9587,7 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { Version feedTransferredVersion = data->version.get() + 1; TraceEvent(SevDebug, "FetchKeysHaveFeedData", data->thisServerID) - .detail("FKID", interval.pairID) + .detail("FKID", fetchKeysID) .detail("Version", feedTransferredVersion) .detail("StorageVersion", data->storageVersion()); @@ -9657,7 +9616,6 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { shard->readWrite.send(Void()); if (data->shardAware) { data->addShard(ShardInfo::newShard(data, newShard)); // invalidates shard! - coalescePhysicalShards(data, keys); } else { data->addShard(ShardInfo::newReadWrite(shard->keys, data)); // invalidates shard! coalesceShards(data, keys); @@ -9668,7 +9626,7 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { ++data->counters.fetchExecutingCount; data->counters.fetchExecutingMS += 1000 * (now() - executeStart); - TraceEvent(SevDebug, interval.end(), data->thisServerID); + TraceEvent(SevDebug, interval.end(), data->thisServerID).detail("FKID", fetchKeysID); if (conductBulkLoad) { data->bulkLoadMetrics->removeTask(); // Do best effort cleanup @@ -9677,6 +9635,7 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { } catch (Error& e) { TraceEvent(SevDebug, interval.end(), data->thisServerID) + .detail("FKID", fetchKeysID) .errorUnsuppressed(e) .detail("Version", data->version.get()); if (!data->shuttingDown) { @@ -9697,13 +9656,15 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { data->newestDirtyVersion.insert(keys, data->data().getLatestVersion()); } } + TraceEvent(SevError, "FetchKeysError", data->thisServerID) .error(e) .detail("Elapsed", now() - startTime) .detail("KeyBegin", keys.begin) .detail("KeyEnd", keys.end) .detail("FetchVersion", fetchVersion) - .detail("KnownCommittedVersion", data->knownCommittedVersion.get()); + .detail("KnownCommittedVersion", data->knownCommittedVersion.get()) + .detail("FKID", fetchKeysID); if (e.code() != error_code_actor_cancelled) data->otherError.sendError(e); // Kill the storage server. Are there any recoverable errors? if (conductBulkLoad) { @@ -9720,9 +9681,10 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { AddingShard::AddingShard(StorageServer* server, KeyRangeRef const& keys, DataMovementReason reason, - const SSBulkLoadMetadata& ssBulkLoadMetadata) + const SSBulkLoadMetadata& ssBulkLoadMetadata, + std::string teamId) : keys(keys), server(server), transferredVersion(invalidVersion), fetchVersion(invalidVersion), phase(WaitPrevious), - reason(reason), ssBulkLoadMetadata(ssBulkLoadMetadata) { + reason(reason), ssBulkLoadMetadata(ssBulkLoadMetadata), teamId(teamId) { fetchClient = fetchKeys(server, this); } @@ -10237,7 +10199,6 @@ ACTOR Future fetchShardApplyUpdates(StorageServer* data, .detail("MoveInShard", moveInShard->toString()); data->addShard(ShardInfo::newShard(data, newShard)); data->newestAvailableVersion.insert(range, latestVersion); - coalescePhysicalShards(data, range); } validate(data); moveInShard->readWrite.send(Void()); @@ -10623,13 +10584,13 @@ ShardInfo* ShardInfo::newShard(StorageServer* data, const StorageServerShard& sh // moves. For case 1, the bulkload is available only if the encode_shard_location_metadata is on. Therefore, the // old data moves is never for bulkload. For case 2, fallback happens only if fetchCheckpoint fails which is not // a case for bulkload which does not do fetchCheckpoint. - res = newAdding(data, shard.range, DataMovementReason::INVALID, SSBulkLoadMetadata()); + res = newAdding(data, shard.range, DataMovementReason::INVALID, SSBulkLoadMetadata(), shard.teamId); break; case StorageServerShard::ReadWritePending: TraceEvent(SevWarnAlways, "CancellingAlmostReadyMoveInShard").detail("StorageServerShard", shard.toString()); ASSERT(!shard.moveInShardId.present()); // TODO(BulkLoad): current bulkload with ShardedRocksDB and PhysicalSharMove cannot handle this fallback case. - res = newAdding(data, shard.range, DataMovementReason::INVALID, SSBulkLoadMetadata()); + res = newAdding(data, shard.range, DataMovementReason::INVALID, SSBulkLoadMetadata(), shard.teamId); break; case StorageServerShard::MovingIn: { ASSERT(shard.moveInShardId.present()); @@ -10726,6 +10687,7 @@ ACTOR Future restoreShards(StorageServer* data, } else { StorageServerShard rightShard = ranges[i].value->toStorageServerShard(); rightShard.range = range; + rightShard.teamId = shard.teamId; data->addShard(ShardInfo::newShard(data, rightShard)); } } @@ -10803,7 +10765,6 @@ ACTOR Future restoreShards(StorageServer* data, wait(yield()); } - coalescePhysicalShards(data, allKeys); validate(data, /*force=*/true); TraceEvent(SevInfo, "StorageServerRestoreShardsEnd", data->thisServerID).detail("Version", version); @@ -10935,8 +10896,11 @@ void changeServerKeys(StorageServer* data, data->addShard(ShardInfo::newReadWrite(ranges[i], data)); else { ASSERT(ranges[i].value->adding); - data->addShard(ShardInfo::newAdding( - data, ranges[i], ranges[i].value->adding->reason, ranges[i].value->adding->getSSBulkLoadMetadata())); + data->addShard(ShardInfo::newAdding(data, + ranges[i], + ranges[i].value->adding->reason, + ranges[i].value->adding->getSSBulkLoadMetadata(), + "")); CODE_PROBE(true, "ChangeServerKeys reFetchKeys"); } } @@ -10991,7 +10955,7 @@ void changeServerKeys(StorageServer* data, } else { auto& shard = data->shards[range.begin]; if (!shard->assigned() || shard->keys != range) - data->addShard(ShardInfo::newAdding(data, range, dataMoveReason, bulkLoadInfoForAddingShard)); + data->addShard(ShardInfo::newAdding(data, range, dataMoveReason, bulkLoadInfoForAddingShard, "")); } } else { changeNewestAvailable.emplace_back(range, latestVersion); @@ -11100,13 +11064,11 @@ void changeServerKeysWithPhysicalShards(StorageServer* data, // When TSS is lagging behind, it could see data move conflicts. The conflicting TSS will not recover from error and // needs to be removed. Severity sev = data->isTss() ? SevWarnAlways : SevError; + + // Re-align shard boundaries and validate CSK scenarios. for (int i = 0; i < ranges.size(); i++) { const Reference currentShard = ranges[i].value; const KeyRangeRef currentRange = static_cast(ranges[i]); - if (currentShard.isValid()) { - TraceEvent(sevDm, "OverlappingPhysicalShard", data->thisServerID) - .detail("PhysicalShard", currentShard->toStorageServerShard().toString()); - } if (!currentShard.isValid()) { if (currentRange != keys) { TraceEvent(sev, "PhysicalShardStateError") @@ -11119,7 +11081,13 @@ void changeServerKeysWithPhysicalShards(StorageServer* data, .detail("InitialVersion", currentShard->version); throw dataMoveConflictError(data->isTss()); } - } else if (currentShard->notAssigned()) { + continue; + } + + TraceEvent(sevDm, "OverlappingPhysicalShard", data->thisServerID) + .detail("PhysicalShard", currentShard->toStorageServerShard().toString()); + + if (currentShard->notAssigned()) { if (!nowAssigned) { TraceEvent(sev, "PhysicalShardStateError") .detail("SubError", "UnassignEmptyRange") @@ -11140,7 +11108,10 @@ void changeServerKeysWithPhysicalShards(StorageServer* data, .detail("NowAssigned", nowAssigned) .detail("Version", cVer) .detail("ResultingShard", newShard.toString()); - } else if (currentShard->isReadable()) { + continue; + } + + if (currentShard->isReadable()) { StorageServerShard newShard = currentShard->toStorageServerShard(); newShard.range = currentRange; data->addShard(ShardInfo::newShard(data, newShard)); @@ -11149,7 +11120,11 @@ void changeServerKeysWithPhysicalShards(StorageServer* data, .detail("NowAssigned", nowAssigned) .detail("Version", cVer) .detail("ResultingShard", newShard.toString()); - } else if (currentShard->adding) { + continue; + } + + // Shard is being moved. + if (currentShard->adding) { if (nowAssigned) { TraceEvent(sev, "PhysicalShardStateError") .detail("SubError", "UpdateAddingShard") @@ -11162,6 +11137,8 @@ void changeServerKeysWithPhysicalShards(StorageServer* data, .detail("InitialVersion", currentShard->version); throw dataMoveConflictError(data->isTss()); } + + // FetchKeys will be cancelled. StorageServerShard newShard = currentShard->toStorageServerShard(); newShard.range = currentRange; data->addShard(ShardInfo::newShard(data, newShard)); @@ -11170,7 +11147,10 @@ void changeServerKeysWithPhysicalShards(StorageServer* data, .detail("NowAssigned", nowAssigned) .detail("Version", cVer) .detail("ResultingShard", newShard.toString()); - } else if (currentShard->moveInShard) { + continue; + } + + if (currentShard->moveInShard) { if (nowAssigned) { TraceEvent(sev, "PhysicalShardStateError") .detail("SubError", "UpdateMoveInShard") @@ -11183,6 +11163,8 @@ void changeServerKeysWithPhysicalShards(StorageServer* data, .detail("InitialVersion", currentShard->version); throw dataMoveConflictError(data->isTss()); } + + // FetchShard will be cancelled. currentShard->moveInShard->cancel(); updatedMoveInShards.emplace(currentShard->moveInShard->id(), currentShard->moveInShard); StorageServerShard newShard = currentShard->toStorageServerShard(); @@ -11193,11 +11175,13 @@ void changeServerKeysWithPhysicalShards(StorageServer* data, .detail("NowAssigned", nowAssigned) .detail("Version", cVer) .detail("ResultingShard", newShard.toString()); - } else { - ASSERT(false); + continue; } + + ASSERT(false); // unreachable. } + // Update shards in shard map. auto vr = data->shards.intersectingRanges(keys); std::vector> changeNewestAvailable; std::vector removeRanges; @@ -11214,17 +11198,24 @@ void changeServerKeysWithPhysicalShards(StorageServer* data, .detail("NowAssigned", nowAssigned) .detail("ShardState", r->value()->debugDescribeState()); ASSERT(keys.contains(r->range())); + + // Assign empty range on data loss. if (context == CSK_ASSIGN_EMPTY && !dataAvailable) { ASSERT(nowAssigned); TraceEvent(sevDm, "ChangeServerKeysAddEmptyRange", data->thisServerID) .detail("Range", range) - .detail("Version", cVer); + .detail("Version", cVer) + .detail("DataMoveId", dataMoveId); newEmptyRanges.push_back(range); - updatedShards.emplace_back(range, cVer, desiredId, desiredId, StorageServerShard::ReadWrite); - if (data->physicalShards.find(desiredId) == data->physicalShards.end()) { - data->pendingAddRanges[cVer].emplace_back(desiredId, range); - } - } else if (!nowAssigned) { + // auto shardInfo = ShardInfo(range, cVer, desiredId, desiredId, StorageServerShard::ReadWrite); + updatedShards.emplace_back( + range, cVer, desiredId, desiredId, StorageServerShard::ReadWrite, (std::string) "EmptyRange"); + data->pendingAddRanges[cVer].emplace_back(desiredId, dataMoveId, range, Reference()); + continue; + } + + // Unassign shard and remove data range if exists. + if (!nowAssigned) { if (dataAvailable) { ASSERT(data->newestAvailableVersion[range.begin] == latestVersion); // Not that we care, but this used to be checked instead of dataAvailable @@ -11246,89 +11237,109 @@ void changeServerKeysWithPhysicalShards(StorageServer* data, .detail("NowAssigned", nowAssigned) .detail("Version", cVer) .detail("NewShard", updatedShards.back().toString()); - } else if (!dataAvailable) { - if (version == data->initialClusterVersion - 1) { - TraceEvent(sevDm, "CSKWithPhysicalShardsSeedRange", data->thisServerID) - .detail("ShardID", desiredId) - .detail("Range", range); - changeNewestAvailable.emplace_back(range, latestVersion); - updatedShards.push_back( - StorageServerShard(range, version, desiredId, desiredId, StorageServerShard::ReadWrite)); - setAvailableStatus(data, range, true); - // Note: The initial range is available, however, the shard won't be created in the storage engine - // until version is committed. - data->pendingAddRanges[cVer].emplace_back(desiredId, range); - TraceEvent(sevDm, "SSInitialShard", data->thisServerID) - .detail("Range", range) - .detail("NowAssigned", nowAssigned) - .detail("Version", cVer) - .detail("NewShard", updatedShards.back().toString()); + continue; + } + + // Shard already available in SS. Update desired shard id. + if (dataAvailable) { + auto& shard = data->shards[range.begin]; + TraceEvent(sevDm, "SSAssignShardAlreadyAvailable", data->thisServerID) + .detail("Range", range) + .detail("NowAssigned", nowAssigned) + .detail("Version", cVer) + .detail("ExistingShardId", shard->shardId) + .detail("ExistingDesiredShardId", shard->desiredShardId) + .detail("DataMoveId", dataMoveId); + updatedShards.push_back(StorageServerShard(range, + cVer, + data->shards[range.begin]->shardId, + desiredId, + StorageServerShard::ReadWrite, + shard->teamId)); + changeNewestAvailable.emplace_back(range, latestVersion); + continue; + } + + ASSERT(!dataAvailable); + + // Assign a shard to storage server. Skip fetchKeys if the cluster is in initial state. + if (version == data->initialClusterVersion - 1) { + TraceEvent(sevDm, "CSKWithPhysicalShardsSeedRange", data->thisServerID) + .detail("ShardID", desiredId) + .detail("Range", range); + changeNewestAvailable.emplace_back(range, latestVersion); + updatedShards.push_back(StorageServerShard(range, + version, + desiredId, + desiredId, + StorageServerShard::ReadWrite, + /*teamId=*/(std::string) "SeedRange")); + setAvailableStatus(data, range, true); + // Note: The initial range is available, however, the shard won't be created in the storage engine + // until version is committed. + data->pendingAddRanges[cVer].emplace_back(desiredId, dataMoveId, range, Reference()); + TraceEvent(sevDm, "SSInitialShard", data->thisServerID) + .detail("Range", range) + .detail("NowAssigned", nowAssigned) + .detail("Version", cVer) + .detail("NewShard", updatedShards.back().toString()); + + continue; + } + + auto& shard = data->shards[range.begin]; + if (!shard->assigned()) { + if (enablePSM) { + std::shared_ptr moveInShard = data->getMoveInShard(dataMoveId, cVer, conductBulkLoad); + moveInShard->addRange(range); + updatedMoveInShards.emplace(moveInShard->id(), moveInShard); + updatedShards.push_back(StorageServerShard( + range, cVer, desiredId, desiredId, StorageServerShard::MovingIn, moveInShard->id())); } else { - auto& shard = data->shards[range.begin]; - if (!shard->assigned()) { - if (enablePSM) { - std::shared_ptr moveInShard = - data->getMoveInShard(dataMoveId, cVer, conductBulkLoad); - moveInShard->addRange(range); - updatedMoveInShards.emplace(moveInShard->id(), moveInShard); - updatedShards.push_back(StorageServerShard( - range, cVer, desiredId, desiredId, StorageServerShard::MovingIn, moveInShard->id())); - } else { - updatedShards.push_back( - StorageServerShard(range, cVer, desiredId, desiredId, StorageServerShard::Adding)); - data->pendingAddRanges[cVer].emplace_back(desiredId, range); - } - data->newestDirtyVersion.insert(range, cVer); - TraceEvent(sevDm, "SSAssignShard", data->thisServerID) - .detail("Range", range) - .detail("NowAssigned", nowAssigned) - .detail("Version", cVer) - .detail("TotalAssignedAtVer", ++totalAssignedAtVer) - .detail("ConductBulkLoad", conductBulkLoad) - .detail("NewShard", updatedShards.back().toString()); - } else { - ASSERT(shard->adding != nullptr || shard->moveInShard != nullptr); - if (shard->desiredShardId != desiredId) { - TraceEvent(SevWarnAlways, "CSKConflictingMoveInShards", data->thisServerID) - .detail("DataMoveID", dataMoveId) - .detail("Range", range) - .detailf("TargetShard", "%016llx", desiredId) - .detailf("CurrentShard", "%016llx", shard->desiredShardId) - .detail("IsTSS", data->isTss()) - .detail("Version", cVer); - throw dataMoveConflictError(data->isTss()); - } else { - TraceEvent(SevInfo, "CSKMoveInToSameShard", data->thisServerID) - .detail("DataMoveID", dataMoveId) - .detailf("TargetShard", "%016llx", desiredId) - .detail("MoveRange", keys) - .detail("Range", range) - .detail("ExistingShardRange", shard->keys) - .detail("ShardDebugString", shard->debugDescribeState()) - .detail("Version", cVer); - if (context == CSK_FALL_BACK) { - updatedShards.push_back( - StorageServerShard(range, cVer, desiredId, desiredId, StorageServerShard::Adding)); - // Physical shard move fall back happens if and only if the data move is failed to get the - // checkpoint. However, this case never happens the bulkload. So, the bulkload does not - // support fall back. - ASSERT(!conductBulkLoad); // TODO(BulkLoad): remove this assert - data->pendingAddRanges[cVer].emplace_back(desiredId, range); - data->newestDirtyVersion.insert(range, cVer); - // TODO: removeDataRange if the moveInShard has written to the kvs. - } - } - } + updatedShards.push_back(StorageServerShard( + range, cVer, desiredId, desiredId, StorageServerShard::Adding, (std::string) "")); + data->pendingAddRanges[cVer].emplace_back(desiredId, dataMoveId, range, Reference()); } - } else { - updatedShards.push_back(StorageServerShard( - range, cVer, data->shards[range.begin]->shardId, desiredId, StorageServerShard::ReadWrite)); - changeNewestAvailable.emplace_back(range, latestVersion); - TraceEvent(sevDm, "SSAssignShardAlreadyAvailable", data->thisServerID) + data->newestDirtyVersion.insert(range, cVer); + TraceEvent(sevDm, "SSAssignShard", data->thisServerID) .detail("Range", range) .detail("NowAssigned", nowAssigned) .detail("Version", cVer) + .detail("TotalAssignedAtVer", ++totalAssignedAtVer) + .detail("ConductBulkLoad", conductBulkLoad) .detail("NewShard", updatedShards.back().toString()); + } else { + // Shard is being moved. + ASSERT(shard->adding != nullptr || shard->moveInShard != nullptr); + if (shard->desiredShardId != desiredId) { + TraceEvent(SevWarnAlways, "CSKConflictingMoveInShards", data->thisServerID) + .detail("DataMoveID", dataMoveId) + .detail("Range", range) + .detailf("TargetShard", "%016llx", desiredId) + .detailf("CurrentShard", "%016llx", shard->desiredShardId) + .detail("IsTSS", data->isTss()) + .detail("Version", cVer); + throw dataMoveConflictError(data->isTss()); + } + TraceEvent(SevInfo, "CSKMoveInToSameShard", data->thisServerID) + .detail("DataMoveID", dataMoveId) + .detailf("TargetShard", "%016llx", desiredId) + .detail("MoveRange", keys) + .detail("Range", range) + .detail("ExistingShardRange", shard->keys) + .detail("ShardDebugString", shard->debugDescribeState()) + .detail("Version", cVer); + if (context == CSK_FALL_BACK) { + updatedShards.push_back(StorageServerShard( + range, cVer, desiredId, desiredId, StorageServerShard::Adding, (std::string) "")); + // Physical shard move fall back happens if and only if the data move is failed to get the + // checkpoint. However, this case never happens the bulkload. So, the bulkload does not + // support fall back. + ASSERT(!conductBulkLoad); // TODO(BulkLoad): remove this assert + data->pendingAddRanges[cVer].emplace_back(desiredId, dataMoveId, range, Reference()); + data->newestDirtyVersion.insert(range, cVer); + // TODO: removeDataRange if the moveInShard has written to the kvs. + } } } @@ -11336,7 +11347,21 @@ void changeServerKeysWithPhysicalShards(StorageServer* data, data->addShard(ShardInfo::newShard(data, shard)); updateStorageShard(data, shard); } + + // Link shard info to pending new ranges. + // TODO: consider refactoring to avoid extra shard look up. + if (data->pendingAddRanges.find(cVer) != data->pendingAddRanges.end()) { + for (auto& shard : data->pendingAddRanges[cVer]) { + auto it = data->shards.rangeContaining(shard.range.begin); + ASSERT(it->value()); + ASSERT(it.range().end == shard.range.end); + shard.shardInfo = it.value(); + } + } + auto& mLV = data->addVersionToMutationLog(data->data().getLatestVersion()); + + // Persist physcial shard move metadata. for (const auto& [id, shard] : updatedMoveInShards) { data->addMutationToMutationLog( mLV, MutationRef(MutationRef::SetValue, persistMoveInShardKey(id), moveInShardValue(*shard->meta))); @@ -11352,14 +11377,12 @@ void changeServerKeysWithPhysicalShards(StorageServer* data, data->metrics.notifyNotReadable(keys); } - coalescePhysicalShards(data, KeyRangeRef(ranges[0].begin, ranges[ranges.size() - 1].end)); - // Now it is OK to do removeDataRanges, directly and through fetchKeys cancellation (and we have to do so before // validate()) oldShards.clear(); ranges.clear(); for (auto r = removeRanges.begin(); r != removeRanges.end(); ++r) { - removeDataRange(data, data->addVersionToMutationLog(data->data().getLatestVersion()), data->shards, *r); + removeDataRange(data, mLV, data->shards, *r); setAvailableStatus(data, *r, false); } @@ -11543,7 +11566,8 @@ class StorageUpdater { .detail("Version", ver) .detail("EnablePSM", enablePSM) .detail("DataMoveId", dataMoveId.toString()) - .detail("ConductBulkLoad", conductBulkLoad); + .detail("ConductBulkLoad", conductBulkLoad) + .detail("Context", changeServerKeysContextName(context)); if (data->shardAware) { setAssignedStatus(data, keys, nowAssigned); changeServerKeysWithPhysicalShards( @@ -12837,6 +12861,53 @@ ACTOR Future createCheckpoint(StorageServer* data, CheckpointMetaData meta return Void(); } +ACTOR Future getDataMoveMetadata(Version version, + std::vector shards, + StorageServer* self) { + state Transaction tr(self->cx); + tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + tr.setOption(FDBTransactionOptions::LOCK_AWARE); + + loop { + try { + tr.reset(); + state std::vector>> fDataMoves; + for (auto& shard : shards) { + fDataMoves.push_back(tr.get(dataMoveKeyFor(shard.dataMoveId))); + } + wait(waitForAll(fDataMoves)); + break; + } catch (Error& e) { + wait(tr.onError(e)); + } + } + for (int i = 0; i < shards.size(); ++i) { + auto& shard = shards[i]; + if (fDataMoves[i].get().present()) { + DataMoveMetaData dataMove = decodeDataMoveValue(fDataMoves[i].get().get()); + + if (!dataMove.ranges.empty()) { + if (!dataMove.dcTeamIds.present()) { + TraceEvent("TeamIdNotSet").detail("DataMoveId", shard.dataMoveId).detail("Range", shard.range); + continue; + } + auto teamId = dataMove.dcTeamIds.get()[self->locality.describeDcId()]; + ASSERT(dataMove.ranges.front().contains(shard.range)); + shard.shardInfo->teamId = teamId; + auto& addingShard = shard.shardInfo->adding; + ASSERT(addingShard); + addingShard->teamId = teamId; + TraceEvent(SevDebug, "GotValidTeamId").detail("Range", shard.range).detail("TeamId", teamId); + } else { + TraceEvent(SevDebug, "CancelledDataMove").detail("DataMoveId", shards[i].dataMoveId); + } + } + + shard.shardInfo.clear(); + } + return Void(); +} + struct UpdateStorageCommitStats { double beforeStorageUpdates; double beforeStorageCommit; @@ -12980,6 +13051,10 @@ ACTOR Future updateStorage(StorageServer* data) { fAddRanges.push_back(data->storage.addRange(shard.range, shard.shardId)); } wait(waitForAll(fAddRanges)); + if (SERVER_KNOBS->SS_GET_DATA_MOVE_ID) { + wait(getDataMoveMetadata( + data->pendingAddRanges.begin()->first, data->pendingAddRanges.begin()->second, data)); + } TraceEvent(SevVerbose, "SSAddKVSRangeEnd", data->thisServerID) .detail("Version", data->pendingAddRanges.begin()->first) .detail("DurableVersion", data->durableVersion.get()); @@ -14716,6 +14791,7 @@ ACTOR Future storageEngineConsistencyCheck(StorageServer* self) { } CoalescedKeyRangeMap currentShards; + std::unordered_map teamShardCount; currentShards.insert(allKeys, ""); auto fullRange = self->shards.ranges(); for (auto it = fullRange.begin(); it != fullRange.end(); ++it) { @@ -14724,9 +14800,14 @@ ACTOR Future storageEngineConsistencyCheck(StorageServer* self) { } if (it.value()->assigned()) { currentShards.insert(it.range(), format("%016llx", it.value()->shardId)); + teamShardCount[it.value()->teamId]++; } } + for (const auto& kv : teamShardCount) { + TraceEvent("StorageServerTeamShardCount").detail("TeamId", kv.first).detail("ShardCount", kv.second); + } + auto kvShards = self->storage.getExistingRanges(); TraceEvent(SevInfo, "StorageEngineConsistencyCheckStarted").log(); @@ -15442,6 +15523,9 @@ ACTOR Future storageServer(IKeyValueStore* persistentData, self.moveInShards.clear(); + // Clear pending shards so relevant actors could be destroyed. + self.pendingAddRanges.clear(); + state Error err = e; if (storageServerTerminated(self, persistentData, err)) { ssCore.cancel();