@@ -1546,7 +1546,7 @@ class ShardManager {
1546
1546
for (auto it = ranges.begin (); it != ranges.end (); ++it) {
1547
1547
if (it.value ()) {
1548
1548
if (it.value ()->physicalShard ->id == id) {
1549
- TraceEvent (SevError , " ShardedRocksDBAddRange" )
1549
+ TraceEvent (SevWarn , " ShardedRocksDBAddRange" )
1550
1550
.detail (" ErrorType" , " RangeAlreadyExist" )
1551
1551
.detail (" IntersectingRange" , it->range ())
1552
1552
.detail (" DataShardRange" , it->value ()->range )
@@ -1564,15 +1564,45 @@ class ShardManager {
1564
1564
}
1565
1565
}
1566
1566
1567
- auto currentCfOptions = active ? rState->getCFOptions () : rState->getCFOptionsForInactiveShard ();
1568
- auto [it, inserted] = physicalShards.emplace (id, std::make_shared<PhysicalShard>(db, id, currentCfOptions));
1569
- std::shared_ptr<PhysicalShard>& shard = it->second ;
1567
+ auto it = physicalShards.find (id);
1568
+ std::shared_ptr<PhysicalShard> physicalShard = nullptr ;
1569
+ if (it != physicalShards.end ()) {
1570
+ physicalShard = it->second ;
1571
+ // TODO: consider coalescing continous key ranges.
1572
+ if (!SERVER_KNOBS->SHARDED_ROCKSDB_ALLOW_MULTIPLE_RANGES ) {
1573
+ bool continous = physicalShard->dataShards .empty ();
1574
+ std::string rangeStr = " " ;
1575
+ for (auto & [_, shard] : physicalShard->dataShards ) {
1576
+ rangeStr += shard->range .toString () + " , " ;
1577
+ if (shard->range .begin < range.begin && shard->range .end == range.begin ) {
1578
+ continous = true ;
1579
+ break ;
1580
+ }
1581
+ if (shard->range .begin > range.begin && range.end == shard->range .begin ) {
1582
+ continous = true ;
1583
+ break ;
1584
+ }
1585
+ }
1586
+ if (!continous) {
1587
+ // When multiple shards are merged into a single shard, the storage server might already own a piece
1588
+ // of the resulting shard. Because intra storage server move is disabled, the merge data move could
1589
+ // create multiple segments in a single physcial shard.
1590
+ TraceEvent (" AddMultipleRanges" )
1591
+ .detail (" NewRange" , range)
1592
+ .detail (" OtherRanges" , rangeStr)
1593
+ .setMaxFieldLength (1000 );
1594
+ }
1595
+ }
1596
+ } else {
1597
+ auto currentCfOptions = active ? rState->getCFOptions () : rState->getCFOptionsForInactiveShard ();
1598
+ auto [it, inserted] = physicalShards.emplace (id, std::make_shared<PhysicalShard>(db, id, currentCfOptions));
1599
+ physicalShard = it->second ;
1600
+ }
1570
1601
1571
1602
activePhysicalShardIds.emplace (id);
1572
-
1573
- auto dataShard = std::make_unique<DataShard>(range, shard.get ());
1603
+ auto dataShard = std::make_unique<DataShard>(range, physicalShard.get ());
1574
1604
dataShardMap.insert (range, dataShard.get ());
1575
- shard ->dataShards [range.begin .toString ()] = std::move (dataShard);
1605
+ physicalShard ->dataShards [range.begin .toString ()] = std::move (dataShard);
1576
1606
1577
1607
validate ();
1578
1608
@@ -1581,7 +1611,7 @@ class ShardManager {
1581
1611
.detail (" ShardId" , id)
1582
1612
.detail (" Active" , active);
1583
1613
1584
- return shard .get ();
1614
+ return physicalShard .get ();
1585
1615
}
1586
1616
1587
1617
std::vector<std::string> removeRange (KeyRange range) {
@@ -1636,6 +1666,7 @@ class ShardManager {
1636
1666
1637
1667
// Range modification could result in more than one segments. Remove the original segment key here.
1638
1668
existingShard->dataShards .erase (shardRange.begin .toString ());
1669
+ int count = 0 ;
1639
1670
if (shardRange.begin < range.begin ) {
1640
1671
auto dataShard =
1641
1672
std::make_unique<DataShard>(KeyRange (KeyRangeRef (shardRange.begin , range.begin )), existingShard);
@@ -1646,6 +1677,7 @@ class ShardManager {
1646
1677
1647
1678
existingShard->dataShards [shardRange.begin .toString ()] = std::move (dataShard);
1648
1679
logShardEvent (existingShard->id , shardRange, ShardOp::MODIFY_RANGE, SevInfo, msg);
1680
+ count++;
1649
1681
}
1650
1682
1651
1683
if (shardRange.end > range.end ) {
@@ -1658,6 +1690,18 @@ class ShardManager {
1658
1690
1659
1691
existingShard->dataShards [range.end .toString ()] = std::move (dataShard);
1660
1692
logShardEvent (existingShard->id , shardRange, ShardOp::MODIFY_RANGE, SevInfo, msg);
1693
+ count++;
1694
+ }
1695
+
1696
+ if (count > 1 ) {
1697
+ // During shard split, a shard could be split into multiple key ranges. One of the key ranges will
1698
+ // remain on the storage server, other key ranges will be moved to new server. Depending on the order of
1699
+ // executing the split data moves, a shard could be break into multiple pieces. Eventually a single
1700
+ // continous key range will remain on the physical shard. Data consistency is guaranteed.
1701
+ //
1702
+ // For team based shard placement, we expect multiple data shards to be located on the same physical
1703
+ // shard.
1704
+ TraceEvent (" RangeSplit" ).detail (" OriginalRange" , shardRange).detail (" RemovedRange" , range);
1661
1705
}
1662
1706
}
1663
1707
@@ -1986,28 +2030,37 @@ class ShardManager {
1986
2030
}
1987
2031
1988
2032
TraceEvent (SevVerbose, " ShardedRocksValidateShardManager" , this ->logId );
2033
+ int totalDataShards = 0 ;
2034
+ int expectedDataShards = 0 ;
1989
2035
for (auto s = dataShardMap.ranges ().begin (); s != dataShardMap.ranges ().end (); ++s) {
1990
2036
TraceEvent e (SevVerbose, " ShardedRocksValidateDataShardMap" , this ->logId );
1991
2037
e.detail (" Range" , s->range ());
1992
2038
const DataShard* shard = s->value ();
1993
2039
e.detail (" ShardAddress" , reinterpret_cast <std::uintptr_t >(shard));
1994
- if (shard != nullptr ) {
1995
- e.detail (" PhysicalShard" , shard->physicalShard ->id );
1996
- } else {
1997
- e.detail (" Shard" , " Empty" );
2040
+ if (shard == nullptr ) {
2041
+ e.detail (" RangeUnassigned" , " True" );
2042
+ continue ;
1998
2043
}
1999
- if (shard != nullptr ) {
2000
- if (shard->range != static_cast <KeyRangeRef>(s->range ())) {
2001
- TraceEvent (SevWarnAlways, " ShardRangeMismatch" ).detail (" Range" , s->range ());
2002
- }
2003
-
2004
- ASSERT (shard->range == static_cast <KeyRangeRef>(s->range ()));
2005
- ASSERT (shard->physicalShard != nullptr );
2006
- auto it = shard->physicalShard ->dataShards .find (shard->range .begin .toString ());
2007
- ASSERT (it != shard->physicalShard ->dataShards .end ());
2008
- ASSERT (it->second .get () == shard);
2044
+ totalDataShards++;
2045
+ if (shard->range != static_cast <KeyRangeRef>(s->range ())) {
2046
+ TraceEvent (SevWarnAlways, " ShardRangeMismatch" )
2047
+ .detail (" Range" , s->range ())
2048
+ .detail (" DataShardRange" , shard->range )
2049
+ .detail (" PhysicalShardId" , shard->physicalShard ->id );
2009
2050
}
2051
+
2052
+ ASSERT (shard->range == static_cast <KeyRangeRef>(s->range ()));
2053
+ ASSERT (shard->physicalShard != nullptr );
2054
+ auto it = shard->physicalShard ->dataShards .find (shard->range .begin .toString ());
2055
+ ASSERT (it != shard->physicalShard ->dataShards .end ());
2056
+ ASSERT (it->second .get () == shard);
2057
+ }
2058
+
2059
+ for (auto [shardId, physicalShard] : physicalShards) {
2060
+ ASSERT (physicalShard);
2061
+ expectedDataShards += physicalShard->dataShards .size ();
2010
2062
}
2063
+ ASSERT_EQ (expectedDataShards, totalDataShards);
2011
2064
}
2012
2065
2013
2066
private:
@@ -4403,6 +4456,81 @@ TEST_CASE("noSim/ShardedRocksDB/Metadata") {
4403
4456
return Void ();
4404
4457
}
4405
4458
4459
+ TEST_CASE (" noSim/ShardedRocksDBRangeOps/RemoveSplitRange" ) {
4460
+ state std::string rocksDBTestDir = " sharded-rocksdb-kvs-test-db" ;
4461
+ platform::eraseDirectoryRecursive (rocksDBTestDir);
4462
+
4463
+ state ShardedRocksDBKeyValueStore* rocksdbStore =
4464
+ new ShardedRocksDBKeyValueStore (rocksDBTestDir, deterministicRandom ()->randomUniqueID ());
4465
+ state IKeyValueStore* kvStore = rocksdbStore;
4466
+ wait (kvStore->init ());
4467
+
4468
+ // Add two ranges to the same shard.
4469
+ {
4470
+ std::vector<Future<Void>> addRangeFutures;
4471
+ addRangeFutures.push_back (kvStore->addRange (KeyRangeRef (" a" _sr, " d" _sr), " shard-1" ));
4472
+ addRangeFutures.push_back (kvStore->addRange (KeyRangeRef (" g" _sr, " n" _sr), " shard-1" ));
4473
+
4474
+ wait (waitForAll (addRangeFutures));
4475
+ }
4476
+
4477
+ state std::set<std::string> originalKeys = { " a" , " b" , " c" , " g" , " h" , " m" };
4478
+ state std::set<std::string> currentKeys = originalKeys;
4479
+ for (auto key : originalKeys) {
4480
+ kvStore->set (KeyValueRef (key, key));
4481
+ }
4482
+ wait (kvStore->commit ());
4483
+
4484
+ state std::string key;
4485
+ for (auto key : currentKeys) {
4486
+ Optional<Value> val = wait (kvStore->readValue (key));
4487
+ ASSERT (val.present ());
4488
+ ASSERT (val.get ().toString () == key);
4489
+ }
4490
+
4491
+ // Remove single range.
4492
+ std::vector<std::string> shardsToCleanUp;
4493
+ auto shardIds = kvStore->removeRange (KeyRangeRef (" b" _sr, " c" _sr));
4494
+ // Remove range didn't create empty shard.
4495
+ ASSERT_EQ (shardIds.size (), 0 );
4496
+
4497
+ currentKeys.erase (" b" );
4498
+ for (auto key : originalKeys) {
4499
+ Optional<Value> val = wait (kvStore->readValue (key));
4500
+ if (currentKeys.contains (key)) {
4501
+ ASSERT (val.present ());
4502
+ ASSERT (val.get ().toString () == key);
4503
+ } else {
4504
+ ASSERT (!val.present ());
4505
+ }
4506
+ }
4507
+
4508
+ // Remove range spanning on multple sub range.
4509
+ shardIds = kvStore->removeRange (KeyRangeRef (" c" _sr, " k" _sr));
4510
+ ASSERT (shardIds.empty ());
4511
+
4512
+ currentKeys.erase (" c" );
4513
+ currentKeys.erase (" g" );
4514
+ currentKeys.erase (" h" );
4515
+ for (auto key : originalKeys) {
4516
+ Optional<Value> val = wait (kvStore->readValue (key));
4517
+ if (currentKeys.contains (key)) {
4518
+ ASSERT (val.present ());
4519
+ ASSERT (val.get ().toString () == key);
4520
+ } else {
4521
+ ASSERT (!val.present ());
4522
+ }
4523
+ }
4524
+
4525
+ {
4526
+ Future<Void> closed = kvStore->onClosed ();
4527
+ kvStore->dispose ();
4528
+ wait (closed);
4529
+ }
4530
+ ASSERT (!directoryExists (rocksDBTestDir));
4531
+ return Void ();
4532
+ }
4533
+
4406
4534
TEST_CASE (" noSim/ShardedRocksDBCheckpoint/CheckpointBasic" ) {
4407
4535
state std::string rocksDBTestDir = " sharded-rocks-checkpoint-restore" ;
4408
4536
state std::map<Key, Value> kvs ({ { " a" _sr, " TestValueA" _sr },
@@ -4777,7 +4905,7 @@ ACTOR Future<Void> testWrites(IKeyValueStore* kvStore, int writeCount) {
4777
4905
state int i = 0 ;
4778
4906
4779
4907
while (i < writeCount) {
4780
- state int endCount = deterministicRandom ()->randomInt (i, i + 1000 );
4908
+ state int endCount = deterministicRandom ()->randomInt (i + 1 , i + 1000 );
4781
4909
state std::string beginKey = format (" key-%6d" , i);
4782
4910
state std::string endKey = format (" key-%6d" , endCount);
4783
4911
wait (kvStore->addRange (KeyRangeRef (beginKey, endKey), deterministicRandom ()->randomUniqueID ().toString ()));
0 commit comments