|
26 | 26 | import java.util.HashMap;
|
27 | 27 | import java.util.List;
|
28 | 28 | import java.util.Map;
|
| 29 | +import java.util.Set; |
| 30 | +import java.util.concurrent.TimeUnit; |
29 | 31 | import org.agrona.collections.Hashing;
|
30 | 32 | import org.agrona.collections.Int2IntCounterMap;
|
31 | 33 | import org.apache.hadoop.hbase.HDFSBlocksDistribution;
|
32 | 34 | import org.apache.hadoop.hbase.ServerName;
|
33 | 35 | import org.apache.hadoop.hbase.client.RegionInfo;
|
34 | 36 | import org.apache.hadoop.hbase.client.RegionReplicaUtil;
|
35 | 37 | import org.apache.hadoop.hbase.master.RackManager;
|
| 38 | +import org.apache.hadoop.hbase.master.RegionPlan; |
36 | 39 | import org.apache.hadoop.hbase.net.Address;
|
37 | 40 | import org.apache.hadoop.hbase.util.Pair;
|
38 | 41 | import org.apache.yetus.audience.InterfaceAudience;
|
39 | 42 | import org.slf4j.Logger;
|
40 | 43 | import org.slf4j.LoggerFactory;
|
41 | 44 |
|
| 45 | +import org.apache.hbase.thirdparty.com.google.common.base.Supplier; |
| 46 | +import org.apache.hbase.thirdparty.com.google.common.base.Suppliers; |
| 47 | +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; |
| 48 | + |
42 | 49 | /**
|
43 | 50 | * An efficient array based implementation similar to ClusterState for keeping the status of the
|
44 | 51 | * cluster in terms of region assignment and distribution. LoadBalancers, such as
|
@@ -123,6 +130,14 @@ class BalancerClusterState {
|
123 | 130 | // Maps regionName -> oldServerName -> cache ratio of the region on the old server
|
124 | 131 | Map<String, Pair<ServerName, Float>> regionCacheRatioOnOldServerMap;
|
125 | 132 |
|
| 133 | + private Supplier<List<Integer>> shuffledServerIndicesSupplier = |
| 134 | + Suppliers.memoizeWithExpiration(() -> { |
| 135 | + Collection<Integer> serverIndices = serversToIndex.values(); |
| 136 | + List<Integer> shuffledServerIndices = new ArrayList<>(serverIndices); |
| 137 | + Collections.shuffle(shuffledServerIndices); |
| 138 | + return shuffledServerIndices; |
| 139 | + }, 5, TimeUnit.SECONDS); |
| 140 | + |
126 | 141 | static class DefaultRackManager extends RackManager {
|
127 | 142 | @Override
|
128 | 143 | public String getRack(ServerName server) {
|
@@ -711,6 +726,44 @@ enum LocalityType {
|
711 | 726 | RACK
|
712 | 727 | }
|
713 | 728 |
|
| 729 | + public List<RegionPlan> convertActionToPlans(BalanceAction action) { |
| 730 | + switch (action.getType()) { |
| 731 | + case NULL: |
| 732 | + break; |
| 733 | + case ASSIGN_REGION: |
| 734 | + // FindBugs: Having the assert quietens FB BC_UNCONFIRMED_CAST warnings |
| 735 | + assert action instanceof AssignRegionAction : action.getClass(); |
| 736 | + AssignRegionAction ar = (AssignRegionAction) action; |
| 737 | + return ImmutableList |
| 738 | + .of(new RegionPlan(regions[ar.getRegion()], null, servers[ar.getServer()])); |
| 739 | + case MOVE_REGION: |
| 740 | + assert action instanceof MoveRegionAction : action.getClass(); |
| 741 | + MoveRegionAction mra = (MoveRegionAction) action; |
| 742 | + return ImmutableList.of(new RegionPlan(regions[mra.getRegion()], |
| 743 | + servers[mra.getFromServer()], servers[mra.getToServer()])); |
| 744 | + case SWAP_REGIONS: |
| 745 | + assert action instanceof SwapRegionsAction : action.getClass(); |
| 746 | + SwapRegionsAction a = (SwapRegionsAction) action; |
| 747 | + return ImmutableList.of( |
| 748 | + new RegionPlan(regions[a.getFromRegion()], servers[a.getFromServer()], |
| 749 | + servers[a.getToServer()]), |
| 750 | + new RegionPlan(regions[a.getToRegion()], servers[a.getToServer()], |
| 751 | + servers[a.getFromServer()])); |
| 752 | + case MOVE_BATCH: |
| 753 | + assert action instanceof MoveBatchAction : action.getClass(); |
| 754 | + MoveBatchAction mba = (MoveBatchAction) action; |
| 755 | + List<RegionPlan> mbRegionPlans = new ArrayList<>(); |
| 756 | + for (MoveRegionAction moveRegionAction : mba.getMoveActions()) { |
| 757 | + mbRegionPlans.add(new RegionPlan(regions[moveRegionAction.getRegion()], |
| 758 | + servers[moveRegionAction.getFromServer()], servers[moveRegionAction.getToServer()])); |
| 759 | + } |
| 760 | + return mbRegionPlans; |
| 761 | + default: |
| 762 | + throw new RuntimeException("Unknown action:" + action.getType()); |
| 763 | + } |
| 764 | + return Collections.emptyList(); |
| 765 | + } |
| 766 | + |
714 | 767 | public void doAction(BalanceAction action) {
|
715 | 768 | switch (action.getType()) {
|
716 | 769 | case NULL:
|
@@ -742,8 +795,25 @@ public void doAction(BalanceAction action) {
|
742 | 795 | regionMoved(a.getFromRegion(), a.getFromServer(), a.getToServer());
|
743 | 796 | regionMoved(a.getToRegion(), a.getToServer(), a.getFromServer());
|
744 | 797 | break;
|
| 798 | + case MOVE_BATCH: |
| 799 | + assert action instanceof MoveBatchAction : action.getClass(); |
| 800 | + MoveBatchAction mba = (MoveBatchAction) action; |
| 801 | + for (int serverIndex : mba.getServerToRegionsToRemove().keySet()) { |
| 802 | + Set<Integer> regionsToRemove = mba.getServerToRegionsToRemove().get(serverIndex); |
| 803 | + regionsPerServer[serverIndex] = |
| 804 | + removeRegions(regionsPerServer[serverIndex], regionsToRemove); |
| 805 | + } |
| 806 | + for (int serverIndex : mba.getServerToRegionsToAdd().keySet()) { |
| 807 | + Set<Integer> regionsToAdd = mba.getServerToRegionsToAdd().get(serverIndex); |
| 808 | + regionsPerServer[serverIndex] = addRegions(regionsPerServer[serverIndex], regionsToAdd); |
| 809 | + } |
| 810 | + for (MoveRegionAction moveRegionAction : mba.getMoveActions()) { |
| 811 | + regionMoved(moveRegionAction.getRegion(), moveRegionAction.getFromServer(), |
| 812 | + moveRegionAction.getToServer()); |
| 813 | + } |
| 814 | + break; |
745 | 815 | default:
|
746 |
| - throw new RuntimeException("Uknown action:" + action.getType()); |
| 816 | + throw new RuntimeException("Unknown action:" + action.getType()); |
747 | 817 | }
|
748 | 818 | }
|
749 | 819 |
|
@@ -905,6 +975,52 @@ int[] addRegion(int[] regions, int regionIndex) {
|
905 | 975 | return newRegions;
|
906 | 976 | }
|
907 | 977 |
|
| 978 | + int[] removeRegions(int[] regions, Set<Integer> regionIndicesToRemove) { |
| 979 | + // Calculate the size of the new regions array |
| 980 | + int newSize = regions.length - regionIndicesToRemove.size(); |
| 981 | + if (newSize < 0) { |
| 982 | + throw new IllegalStateException( |
| 983 | + "Region indices mismatch: more regions to remove than in the regions array"); |
| 984 | + } |
| 985 | + |
| 986 | + int[] newRegions = new int[newSize]; |
| 987 | + int newIndex = 0; |
| 988 | + |
| 989 | + // Copy only the regions not in the removal set |
| 990 | + for (int region : regions) { |
| 991 | + if (!regionIndicesToRemove.contains(region)) { |
| 992 | + newRegions[newIndex++] = region; |
| 993 | + } |
| 994 | + } |
| 995 | + |
| 996 | + // If the newIndex is smaller than newSize, some regions were missing from the input array |
| 997 | + if (newIndex != newSize) { |
| 998 | + throw new IllegalStateException("Region indices mismatch: some regions in the removal " |
| 999 | + + "set were not found in the regions array"); |
| 1000 | + } |
| 1001 | + |
| 1002 | + return newRegions; |
| 1003 | + } |
| 1004 | + |
| 1005 | + int[] addRegions(int[] regions, Set<Integer> regionIndicesToAdd) { |
| 1006 | + int[] newRegions = new int[regions.length + regionIndicesToAdd.size()]; |
| 1007 | + |
| 1008 | + // Copy the existing regions to the new array |
| 1009 | + System.arraycopy(regions, 0, newRegions, 0, regions.length); |
| 1010 | + |
| 1011 | + // Add the new regions at the end of the array |
| 1012 | + int newIndex = regions.length; |
| 1013 | + for (int regionIndex : regionIndicesToAdd) { |
| 1014 | + newRegions[newIndex++] = regionIndex; |
| 1015 | + } |
| 1016 | + |
| 1017 | + return newRegions; |
| 1018 | + } |
| 1019 | + |
| 1020 | + List<Integer> getShuffledServerIndices() { |
| 1021 | + return shuffledServerIndicesSupplier.get(); |
| 1022 | + } |
| 1023 | + |
908 | 1024 | int[] addRegionSorted(int[] regions, int regionIndex) {
|
909 | 1025 | int[] newRegions = new int[regions.length + 1];
|
910 | 1026 | int i = 0;
|
@@ -1004,6 +1120,10 @@ void setNumMovedRegions(int numMovedRegions) {
|
1004 | 1120 | this.numMovedRegions = numMovedRegions;
|
1005 | 1121 | }
|
1006 | 1122 |
|
| 1123 | + public int getMaxReplicas() { |
| 1124 | + return maxReplicas; |
| 1125 | + } |
| 1126 | + |
1007 | 1127 | @Override
|
1008 | 1128 | public String toString() {
|
1009 | 1129 | StringBuilder desc = new StringBuilder("Cluster={servers=[");
|
|
0 commit comments