-
Notifications
You must be signed in to change notification settings - Fork 3.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HBASE-28513 The StochasticLoadBalancer should support discrete evaluations #6651
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,19 +26,26 @@ | |
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Set; | ||
import java.util.concurrent.TimeUnit; | ||
import org.agrona.collections.Hashing; | ||
import org.agrona.collections.Int2IntCounterMap; | ||
import org.apache.hadoop.hbase.HDFSBlocksDistribution; | ||
import org.apache.hadoop.hbase.ServerName; | ||
import org.apache.hadoop.hbase.client.RegionInfo; | ||
import org.apache.hadoop.hbase.client.RegionReplicaUtil; | ||
import org.apache.hadoop.hbase.master.RackManager; | ||
import org.apache.hadoop.hbase.master.RegionPlan; | ||
import org.apache.hadoop.hbase.net.Address; | ||
import org.apache.hadoop.hbase.util.Pair; | ||
import org.apache.yetus.audience.InterfaceAudience; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import org.apache.hbase.thirdparty.com.google.common.base.Supplier; | ||
import org.apache.hbase.thirdparty.com.google.common.base.Suppliers; | ||
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; | ||
|
||
/** | ||
* An efficient array based implementation similar to ClusterState for keeping the status of the | ||
* cluster in terms of region assignment and distribution. LoadBalancers, such as | ||
|
@@ -123,6 +130,14 @@ class BalancerClusterState { | |
// Maps regionName -> oldServerName -> cache ratio of the region on the old server | ||
Map<String, Pair<ServerName, Float>> regionCacheRatioOnOldServerMap; | ||
|
||
private Supplier<List<Integer>> shuffledServerIndicesSupplier = | ||
Suppliers.memoizeWithExpiration(() -> { | ||
Collection<Integer> serverIndices = serversToIndex.values(); | ||
List<Integer> shuffledServerIndices = new ArrayList<>(serverIndices); | ||
Collections.shuffle(shuffledServerIndices); | ||
return shuffledServerIndices; | ||
}, 5, TimeUnit.SECONDS); | ||
|
||
static class DefaultRackManager extends RackManager { | ||
@Override | ||
public String getRack(ServerName server) { | ||
|
@@ -711,6 +726,44 @@ enum LocalityType { | |
RACK | ||
} | ||
|
||
public List<RegionPlan> convertActionToPlans(BalanceAction action) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. RegionPlans are much simpler than balance actions because they can represent the consequences of every type of balance action. With that in mind, I chose to have balancer conditionals only be concerned with RegionPlans so that their implementations could be simpler — but it means that I need to convert BalanceActions to region plans, without applying the plans, for evaluation against conditionals. Thus, this method needed to be added There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: how would you feel about giving the |
||
switch (action.getType()) { | ||
case NULL: | ||
break; | ||
case ASSIGN_REGION: | ||
// FindBugs: Having the assert quietens FB BC_UNCONFIRMED_CAST warnings | ||
assert action instanceof AssignRegionAction : action.getClass(); | ||
AssignRegionAction ar = (AssignRegionAction) action; | ||
return ImmutableList | ||
.of(new RegionPlan(regions[ar.getRegion()], null, servers[ar.getServer()])); | ||
case MOVE_REGION: | ||
assert action instanceof MoveRegionAction : action.getClass(); | ||
MoveRegionAction mra = (MoveRegionAction) action; | ||
return ImmutableList.of(new RegionPlan(regions[mra.getRegion()], | ||
servers[mra.getFromServer()], servers[mra.getToServer()])); | ||
case SWAP_REGIONS: | ||
assert action instanceof SwapRegionsAction : action.getClass(); | ||
SwapRegionsAction a = (SwapRegionsAction) action; | ||
return ImmutableList.of( | ||
new RegionPlan(regions[a.getFromRegion()], servers[a.getFromServer()], | ||
servers[a.getToServer()]), | ||
new RegionPlan(regions[a.getToRegion()], servers[a.getToServer()], | ||
servers[a.getFromServer()])); | ||
case MOVE_BATCH: | ||
assert action instanceof MoveBatchAction : action.getClass(); | ||
MoveBatchAction mba = (MoveBatchAction) action; | ||
List<RegionPlan> mbRegionPlans = new ArrayList<>(); | ||
for (MoveRegionAction moveRegionAction : mba.getMoveActions()) { | ||
mbRegionPlans.add(new RegionPlan(regions[moveRegionAction.getRegion()], | ||
servers[moveRegionAction.getFromServer()], servers[moveRegionAction.getToServer()])); | ||
} | ||
return mbRegionPlans; | ||
default: | ||
throw new RuntimeException("Unknown action:" + action.getType()); | ||
} | ||
return Collections.emptyList(); | ||
} | ||
|
||
public void doAction(BalanceAction action) { | ||
switch (action.getType()) { | ||
case NULL: | ||
|
@@ -742,8 +795,25 @@ public void doAction(BalanceAction action) { | |
regionMoved(a.getFromRegion(), a.getFromServer(), a.getToServer()); | ||
regionMoved(a.getToRegion(), a.getToServer(), a.getFromServer()); | ||
break; | ||
case MOVE_BATCH: | ||
assert action instanceof MoveBatchAction : action.getClass(); | ||
MoveBatchAction mba = (MoveBatchAction) action; | ||
for (int serverIndex : mba.getServerToRegionsToRemove().keySet()) { | ||
Set<Integer> regionsToRemove = mba.getServerToRegionsToRemove().get(serverIndex); | ||
regionsPerServer[serverIndex] = | ||
removeRegions(regionsPerServer[serverIndex], regionsToRemove); | ||
} | ||
for (int serverIndex : mba.getServerToRegionsToAdd().keySet()) { | ||
Set<Integer> regionsToAdd = mba.getServerToRegionsToAdd().get(serverIndex); | ||
regionsPerServer[serverIndex] = addRegions(regionsPerServer[serverIndex], regionsToAdd); | ||
} | ||
for (MoveRegionAction moveRegionAction : mba.getMoveActions()) { | ||
regionMoved(moveRegionAction.getRegion(), moveRegionAction.getFromServer(), | ||
moveRegionAction.getToServer()); | ||
} | ||
break; | ||
default: | ||
throw new RuntimeException("Uknown action:" + action.getType()); | ||
throw new RuntimeException("Unknown action:" + action.getType()); | ||
} | ||
} | ||
|
||
|
@@ -905,6 +975,52 @@ int[] addRegion(int[] regions, int regionIndex) { | |
return newRegions; | ||
} | ||
|
||
int[] removeRegions(int[] regions, Set<Integer> regionIndicesToRemove) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This method, and the below addRegions, are just a nicer way to add/remove regions from the BCS arrays in bulk |
||
// Calculate the size of the new regions array | ||
int newSize = regions.length - regionIndicesToRemove.size(); | ||
if (newSize < 0) { | ||
throw new IllegalStateException( | ||
"Region indices mismatch: more regions to remove than in the regions array"); | ||
} | ||
|
||
int[] newRegions = new int[newSize]; | ||
int newIndex = 0; | ||
|
||
// Copy only the regions not in the removal set | ||
for (int region : regions) { | ||
if (!regionIndicesToRemove.contains(region)) { | ||
newRegions[newIndex++] = region; | ||
} | ||
} | ||
|
||
// If the newIndex is smaller than newSize, some regions were missing from the input array | ||
if (newIndex != newSize) { | ||
throw new IllegalStateException("Region indices mismatch: some regions in the removal " | ||
+ "set were not found in the regions array"); | ||
} | ||
|
||
return newRegions; | ||
} | ||
|
||
int[] addRegions(int[] regions, Set<Integer> regionIndicesToAdd) { | ||
int[] newRegions = new int[regions.length + regionIndicesToAdd.size()]; | ||
|
||
// Copy the existing regions to the new array | ||
System.arraycopy(regions, 0, newRegions, 0, regions.length); | ||
|
||
// Add the new regions at the end of the array | ||
int newIndex = regions.length; | ||
for (int regionIndex : regionIndicesToAdd) { | ||
newRegions[newIndex++] = regionIndex; | ||
} | ||
|
||
return newRegions; | ||
} | ||
|
||
List<Integer> getShuffledServerIndices() { | ||
return shuffledServerIndicesSupplier.get(); | ||
} | ||
|
||
int[] addRegionSorted(int[] regions, int regionIndex) { | ||
int[] newRegions = new int[regions.length + 1]; | ||
int i = 0; | ||
|
@@ -1004,6 +1120,10 @@ void setNumMovedRegions(int numMovedRegions) { | |
this.numMovedRegions = numMovedRegions; | ||
} | ||
|
||
public int getMaxReplicas() { | ||
return maxReplicas; | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
StringBuilder desc = new StringBuilder("Cluster={servers=["); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Conditional candidate generators will look at the entire cluster state, and produce a series of moves that should move us towards compliance. To support this, I thought a "batch move" balance action made sense