diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalanceAction.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalanceAction.java index 56b473ae710c..ab9395fef163 100644 --- a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalanceAction.java +++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalanceAction.java @@ -28,6 +28,7 @@ enum Type { ASSIGN_REGION, MOVE_REGION, SWAP_REGIONS, + MOVE_BATCH, NULL, } @@ -51,6 +52,10 @@ Type getType() { return type; } + long getStepCount() { + return 1; + } + @Override public String toString() { return type + ":"; diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java index d60a29a6a70d..7dc9215afa37 100644 --- a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java +++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java @@ -26,6 +26,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; import org.agrona.collections.Hashing; import org.agrona.collections.Int2IntCounterMap; import org.apache.hadoop.hbase.HDFSBlocksDistribution; @@ -33,12 +35,17 @@ import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.master.RackManager; +import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.base.Supplier; +import org.apache.hbase.thirdparty.com.google.common.base.Suppliers; +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; + /** * An efficient array based implementation similar to ClusterState for keeping the status of the * cluster in terms of region assignment and distribution. LoadBalancers, such as @@ -123,6 +130,14 @@ class BalancerClusterState { // Maps regionName -> oldServerName -> cache ratio of the region on the old server Map> regionCacheRatioOnOldServerMap; + private Supplier> shuffledServerIndicesSupplier = + Suppliers.memoizeWithExpiration(() -> { + Collection serverIndices = serversToIndex.values(); + List shuffledServerIndices = new ArrayList<>(serverIndices); + Collections.shuffle(shuffledServerIndices); + return shuffledServerIndices; + }, 5, TimeUnit.SECONDS); + static class DefaultRackManager extends RackManager { @Override public String getRack(ServerName server) { @@ -711,6 +726,44 @@ enum LocalityType { RACK } + public List convertActionToPlans(BalanceAction action) { + switch (action.getType()) { + case NULL: + break; + case ASSIGN_REGION: + // FindBugs: Having the assert quietens FB BC_UNCONFIRMED_CAST warnings + assert action instanceof AssignRegionAction : action.getClass(); + AssignRegionAction ar = (AssignRegionAction) action; + return ImmutableList + .of(new RegionPlan(regions[ar.getRegion()], null, servers[ar.getServer()])); + case MOVE_REGION: + assert action instanceof MoveRegionAction : action.getClass(); + MoveRegionAction mra = (MoveRegionAction) action; + return ImmutableList.of(new RegionPlan(regions[mra.getRegion()], + servers[mra.getFromServer()], servers[mra.getToServer()])); + case SWAP_REGIONS: + assert action instanceof SwapRegionsAction : action.getClass(); + SwapRegionsAction a = (SwapRegionsAction) action; + return ImmutableList.of( + new RegionPlan(regions[a.getFromRegion()], servers[a.getFromServer()], + servers[a.getToServer()]), + new RegionPlan(regions[a.getToRegion()], servers[a.getToServer()], + servers[a.getFromServer()])); + case MOVE_BATCH: + assert action instanceof MoveBatchAction : action.getClass(); + MoveBatchAction mba = (MoveBatchAction) action; + List mbRegionPlans = new ArrayList<>(); + for (MoveRegionAction moveRegionAction : mba.getMoveActions()) { + mbRegionPlans.add(new RegionPlan(regions[moveRegionAction.getRegion()], + servers[moveRegionAction.getFromServer()], servers[moveRegionAction.getToServer()])); + } + return mbRegionPlans; + default: + throw new RuntimeException("Unknown action:" + action.getType()); + } + return Collections.emptyList(); + } + public void doAction(BalanceAction action) { switch (action.getType()) { case NULL: @@ -742,8 +795,25 @@ public void doAction(BalanceAction action) { regionMoved(a.getFromRegion(), a.getFromServer(), a.getToServer()); regionMoved(a.getToRegion(), a.getToServer(), a.getFromServer()); break; + case MOVE_BATCH: + assert action instanceof MoveBatchAction : action.getClass(); + MoveBatchAction mba = (MoveBatchAction) action; + for (int serverIndex : mba.getServerToRegionsToRemove().keySet()) { + Set regionsToRemove = mba.getServerToRegionsToRemove().get(serverIndex); + regionsPerServer[serverIndex] = + removeRegions(regionsPerServer[serverIndex], regionsToRemove); + } + for (int serverIndex : mba.getServerToRegionsToAdd().keySet()) { + Set regionsToAdd = mba.getServerToRegionsToAdd().get(serverIndex); + regionsPerServer[serverIndex] = addRegions(regionsPerServer[serverIndex], regionsToAdd); + } + for (MoveRegionAction moveRegionAction : mba.getMoveActions()) { + regionMoved(moveRegionAction.getRegion(), moveRegionAction.getFromServer(), + moveRegionAction.getToServer()); + } + break; default: - throw new RuntimeException("Uknown action:" + action.getType()); + throw new RuntimeException("Unknown action:" + action.getType()); } } @@ -905,6 +975,52 @@ int[] addRegion(int[] regions, int regionIndex) { return newRegions; } + int[] removeRegions(int[] regions, Set regionIndicesToRemove) { + // Calculate the size of the new regions array + int newSize = regions.length - regionIndicesToRemove.size(); + if (newSize < 0) { + throw new IllegalStateException( + "Region indices mismatch: more regions to remove than in the regions array"); + } + + int[] newRegions = new int[newSize]; + int newIndex = 0; + + // Copy only the regions not in the removal set + for (int region : regions) { + if (!regionIndicesToRemove.contains(region)) { + newRegions[newIndex++] = region; + } + } + + // If the newIndex is smaller than newSize, some regions were missing from the input array + if (newIndex != newSize) { + throw new IllegalStateException("Region indices mismatch: some regions in the removal " + + "set were not found in the regions array"); + } + + return newRegions; + } + + int[] addRegions(int[] regions, Set regionIndicesToAdd) { + int[] newRegions = new int[regions.length + regionIndicesToAdd.size()]; + + // Copy the existing regions to the new array + System.arraycopy(regions, 0, newRegions, 0, regions.length); + + // Add the new regions at the end of the array + int newIndex = regions.length; + for (int regionIndex : regionIndicesToAdd) { + newRegions[newIndex++] = regionIndex; + } + + return newRegions; + } + + List getShuffledServerIndices() { + return shuffledServerIndicesSupplier.get(); + } + int[] addRegionSorted(int[] regions, int regionIndex) { int[] newRegions = new int[regions.length + 1]; int i = 0; @@ -1004,6 +1120,10 @@ void setNumMovedRegions(int numMovedRegions) { this.numMovedRegions = numMovedRegions; } + public int getMaxReplicas() { + return maxReplicas; + } + @Override public String toString() { StringBuilder desc = new StringBuilder("Cluster={servers=["); diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerConditionals.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerConditionals.java new file mode 100644 index 000000000000..d89c813ac137 --- /dev/null +++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerConditionals.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import java.lang.reflect.Constructor; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.master.RegionPlan; +import org.apache.hadoop.hbase.master.balancer.replicas.ReplicaKeyCache; +import org.apache.hadoop.hbase.util.ReflectionUtils; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; + +/** + * Balancer conditionals supplement cost functions in the {@link StochasticLoadBalancer}. Cost + * functions are insufficient and difficult to work with when making discrete decisions; this is + * because they operate on a continuous scale, and each cost function's multiplier affects the + * relative importance of every other cost function. So it is difficult to meaningfully and clearly + * value many aspects of your region distribution via cost functions alone. Conditionals allow you + * to very clearly define discrete rules that your balancer would ideally follow. To clarify, a + * conditional violation will not block a region assignment because we would prefer to have uptime + * than have perfectly intentional balance. But conditionals allow you to, for example, define that + * a region's primary and secondary should not live on the same rack. Another example, conditionals + * make it easy to define that system tables will ideally be isolated on their own RegionServer + * (without needing to manage distinct RegionServer groups). Use of conditionals may cause an + * extremely unbalanced cluster to exceed its max balancer runtime. This is necessary because + * conditional candidate generation is quite expensive, and cutting it off early could prevent us + * from finding a solution. + */ +@InterfaceAudience.Private +final class BalancerConditionals implements Configurable { + + private static final Logger LOG = LoggerFactory.getLogger(BalancerConditionals.class); + + static final BalancerConditionals INSTANCE = new BalancerConditionals(); + public static final String DISTRIBUTE_REPLICAS_KEY = + "hbase.master.balancer.stochastic.conditionals.distributeReplicas"; + public static final boolean DISTRIBUTE_REPLICAS_DEFAULT = false; + + public static final String ADDITIONAL_CONDITIONALS_KEY = + "hbase.master.balancer.stochastic.additionalConditionals"; + + private Set> conditionalClasses = Collections.emptySet(); + private Set conditionals = Collections.emptySet(); + private Configuration conf; + + private BalancerConditionals() { + } + + boolean shouldRunBalancer(BalancerClusterState cluster) { + return isConditionalBalancingEnabled() && conditionals.stream() + .map(RegionPlanConditional::getCandidateGenerators).flatMap(Collection::stream) + .map(generator -> generator.getWeight(cluster)).anyMatch(weight -> weight > 0); + } + + Set> getConditionalClasses() { + return Set.copyOf(conditionalClasses); + } + + Collection getConditionals() { + return conditionals; + } + + boolean isReplicaDistributionEnabled() { + return conditionalClasses.contains(DistributeReplicasConditional.class); + } + + boolean shouldSkipSloppyServerEvaluation() { + return isConditionalBalancingEnabled(); + } + + boolean isConditionalBalancingEnabled() { + return !conditionalClasses.isEmpty(); + } + + void clearConditionalWeightCaches() { + conditionals.stream().map(RegionPlanConditional::getCandidateGenerators) + .flatMap(Collection::stream) + .forEach(RegionPlanConditionalCandidateGenerator::clearWeightCache); + } + + void loadClusterState(BalancerClusterState cluster) { + conditionals = conditionalClasses.stream().map(clazz -> createConditional(clazz, conf, cluster)) + .filter(Objects::nonNull).collect(Collectors.toSet()); + } + + /** + * Indicates whether the action is good for our conditional compliance. + * @param cluster The cluster state + * @param action The proposed action + * @return -1 if conditionals improve, 0 if neutral, 1 if conditionals degrade + */ + int getViolationCountChange(BalancerClusterState cluster, BalanceAction action) { + boolean isViolatingPre = isViolating(cluster, action.undoAction()); + boolean isViolatingPost = isViolating(cluster, action); + if (isViolatingPre && isViolatingPost) { + return 0; + } else if (!isViolatingPre && isViolatingPost) { + return 1; + } else { + return -1; + } + } + + /** + * Check if the proposed action violates conditionals + * @param cluster The cluster state + * @param action The proposed action + */ + boolean isViolating(BalancerClusterState cluster, BalanceAction action) { + conditionals.forEach(conditional -> conditional.refreshClusterState(cluster)); + if (conditionals.isEmpty()) { + return false; + } + List regionPlans = cluster.convertActionToPlans(action); + for (RegionPlan regionPlan : regionPlans) { + if (isViolating(regionPlan)) { + return true; + } + } + return false; + } + + private boolean isViolating(RegionPlan regionPlan) { + for (RegionPlanConditional conditional : conditionals) { + if (conditional.isViolating(regionPlan)) { + return true; + } + } + return false; + } + + private RegionPlanConditional createConditional(Class clazz, + Configuration conf, BalancerClusterState cluster) { + if (conf == null) { + conf = new Configuration(); + } + if (cluster == null) { + cluster = new BalancerClusterState(Collections.emptyMap(), null, null, null, null); + } + try { + Constructor ctor = + clazz.getDeclaredConstructor(Configuration.class, BalancerClusterState.class); + return ReflectionUtils.instantiate(clazz.getName(), ctor, conf, cluster); + } catch (NoSuchMethodException e) { + LOG.warn("Cannot find constructor with Configuration and " + + "BalancerClusterState parameters for class '{}': {}", clazz.getName(), e.getMessage()); + } + return null; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + ImmutableSet.Builder> conditionalClasses = + ImmutableSet.builder(); + + boolean distributeReplicas = + conf.getBoolean(DISTRIBUTE_REPLICAS_KEY, DISTRIBUTE_REPLICAS_DEFAULT); + if (distributeReplicas) { + conditionalClasses.add(DistributeReplicasConditional.class); + } + + Class[] classes = conf.getClasses(ADDITIONAL_CONDITIONALS_KEY); + for (Class clazz : classes) { + if (!RegionPlanConditional.class.isAssignableFrom(clazz)) { + LOG.warn("Class {} is not a RegionPlanConditional", clazz.getName()); + continue; + } + conditionalClasses.add(clazz.asSubclass(RegionPlanConditional.class)); + } + this.conditionalClasses = conditionalClasses.build(); + ReplicaKeyCache.INSTANCE.setConf(conf); + loadClusterState(null); + } + + @Override + public Configuration getConf() { + return conf; + } +} diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java index 54516868a0a0..c3f17d88cde0 100644 --- a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java +++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java @@ -73,6 +73,9 @@ public abstract class BaseLoadBalancer implements LoadBalancer { public static final boolean DEFAULT_HBASE_MASTER_LOADBALANCE_BYTABLE = false; + public static final String REGIONS_SLOP_KEY = "hbase.regions.slop"; + public static final float REGIONS_SLOP_DEFAULT = 0.2f; + protected static final int MIN_SERVER_BALANCE = 2; private volatile boolean stopped = false; @@ -149,7 +152,9 @@ protected final boolean sloppyRegionServerExist(ClusterLoadState cs) { float average = cs.getLoadAverage(); // for logging int floor = (int) Math.floor(average * (1 - slop)); int ceiling = (int) Math.ceil(average * (1 + slop)); - if (!(cs.getMaxLoad() > ceiling || cs.getMinLoad() < floor)) { + int maxLoad = cs.getMaxLoad(); + int minLoad = cs.getMinLoad(); + if (!(maxLoad > ceiling || minLoad < floor)) { NavigableMap> serversByLoad = cs.getServersByLoad(); if (LOG.isTraceEnabled()) { // If nothing to balance, then don't say anything unless trace-level logging. @@ -391,7 +396,7 @@ public Map> retainAssignment(Map moveRegionActions = new ArrayList<>(); + for (int sourceIndex : cluster.getShuffledServerIndices()) { + int[] serverRegions = cluster.regionsPerServer[sourceIndex]; + Set replicaKeys = new HashSet<>(serverRegions.length); + for (int regionIndex : serverRegions) { + ReplicaKey replicaKey = getReplicaKey(cluster.regions[regionIndex]); + if (replicaKeys.contains(replicaKey)) { + foundColocatedReplicas = true; + if (isWeighing) { + // If weighing, fast exit with an actionable move + return getAction(sourceIndex, regionIndex, pickOtherRandomServer(cluster, sourceIndex), + -1); + } else { + // If not weighing, pick a good move + for (int i = 0; i < cluster.numServers; i++) { + // Randomize destination ordering so we aren't overloading one destination + int destinationIndex = pickOtherRandomServer(cluster, sourceIndex); + if (destinationIndex == sourceIndex) { + continue; + } + MoveRegionAction possibleAction = + new MoveRegionAction(regionIndex, sourceIndex, destinationIndex); + if (isForced) { + return possibleAction; + } else if (willBeAccepted(cluster, possibleAction)) { + cluster.doAction(possibleAction); // Update cluster state to reflect move + moveRegionActions.add(possibleAction); + break; + } + } + } + } else { + replicaKeys.add(replicaKey); + } + if (moveRegionActions.size() >= BATCH_SIZE) { + break; + } + } + if (moveRegionActions.size() >= BATCH_SIZE) { + break; + } + } + + if (!moveRegionActions.isEmpty()) { + return batchMovesAndResetClusterState(cluster, moveRegionActions); + } + // If no colocated replicas are found, return NULL_ACTION + if (foundColocatedReplicas) { + LOG.warn("Could not find a place to put a colocated replica! We will force a move."); + return generateCandidate(cluster, isWeighing, true); + } else { + LOG.trace("No colocated replicas found. No balancing action required."); + } + return BalanceAction.NULL_ACTION; + } +} diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/DistributeReplicasConditional.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/DistributeReplicasConditional.java new file mode 100644 index 000000000000..34b11b563617 --- /dev/null +++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/DistributeReplicasConditional.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import java.util.List; +import java.util.Set; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.master.RegionPlan; +import org.apache.hadoop.hbase.master.balancer.replicas.ReplicaKey; +import org.apache.hadoop.hbase.master.balancer.replicas.ReplicaKeyCache; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; + +/** + * If enabled, this class will help the balancer ensure that replicas aren't placed on the same + * servers or racks as their primary. Configure this via + * {@link BalancerConditionals#DISTRIBUTE_REPLICAS_KEY} + */ +@InterfaceAudience.Private +public class DistributeReplicasConditional extends RegionPlanConditional { + + /** + * Local mini cluster tests are only run on one host/rack by design. If enabled, this will pretend + * that only server isolation is necessary for sufficient replica distribution. This should only + * be used in tests. + */ + public static final String TEST_MODE_ENABLED_KEY = + "hbase.replica.distribution.conditional.testModeEnabled"; + + private static final Logger LOG = LoggerFactory.getLogger(DistributeReplicasConditional.class); + + private final boolean isTestModeEnabled; + private final float slop; + + public DistributeReplicasConditional(Configuration conf, BalancerClusterState cluster) { + super(conf, cluster); + this.isTestModeEnabled = conf.getBoolean(TEST_MODE_ENABLED_KEY, false); + this.slop = + conf.getFloat(BaseLoadBalancer.REGIONS_SLOP_KEY, BaseLoadBalancer.REGIONS_SLOP_DEFAULT); + } + + @Override + public ValidationLevel getValidationLevel() { + if (isTestModeEnabled) { + return ValidationLevel.SERVER; + } + return ValidationLevel.RACK; + } + + @Override + List getCandidateGenerators() { + return ImmutableList.of(DistributeReplicasCandidateGenerator.INSTANCE, + new SlopFixingCandidateGenerator(slop)); + } + + @Override + boolean isViolatingServer(RegionPlan regionPlan, Set serverRegions) { + return checkViolation(regionPlan.getRegionInfo(), getReplicaKey(regionPlan.getRegionInfo()), + serverRegions); + } + + @Override + boolean isViolatingHost(RegionPlan regionPlan, Set hostRegions) { + return checkViolation(regionPlan.getRegionInfo(), getReplicaKey(regionPlan.getRegionInfo()), + hostRegions); + } + + @Override + boolean isViolatingRack(RegionPlan regionPlan, Set rackRegions) { + return checkViolation(regionPlan.getRegionInfo(), getReplicaKey(regionPlan.getRegionInfo()), + rackRegions); + } + + private boolean checkViolation(RegionInfo movingRegion, ReplicaKey movingReplicaKey, + Set destinationRegions) { + for (RegionInfo regionInfo : destinationRegions) { + if (regionInfo.equals(movingRegion)) { + continue; + } + if (getReplicaKey(regionInfo).equals(movingReplicaKey)) { + return true; + } + } + return false; + } + + static ReplicaKey getReplicaKey(RegionInfo regionInfo) { + return ReplicaKeyCache.INSTANCE.getReplicaKey(regionInfo); + } + +} diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java index 8668e7cae3c9..0360a069a023 100644 --- a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java +++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java @@ -90,7 +90,7 @@ public void setFavoredNodesManager(FavoredNodesManager fnm) { /** Returns any candidate generator in random */ @Override - protected CandidateGenerator getRandomGenerator() { + protected CandidateGenerator getRandomGenerator(BalancerClusterState cluster) { Class clazz = shuffledGeneratorClasses.get() .get(ThreadLocalRandom.current().nextInt(candidateGenerators.size())); return candidateGenerators.get(clazz); diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/MoveBatchAction.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/MoveBatchAction.java new file mode 100644 index 000000000000..9aded615a468 --- /dev/null +++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/MoveBatchAction.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import java.util.List; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap; +import org.apache.hbase.thirdparty.com.google.common.collect.Multimaps; + +@InterfaceAudience.Private +public class MoveBatchAction extends BalanceAction { + private final List moveActions; + + MoveBatchAction(List moveActions) { + super(Type.MOVE_BATCH); + this.moveActions = moveActions; + } + + @Override + long getStepCount() { + return moveActions.size(); + } + + public HashMultimap getServerToRegionsToRemove() { + return moveActions.stream().collect(Multimaps.toMultimap(MoveRegionAction::getFromServer, + MoveRegionAction::getRegion, HashMultimap::create)); + } + + public HashMultimap getServerToRegionsToAdd() { + return moveActions.stream().collect(Multimaps.toMultimap(MoveRegionAction::getToServer, + MoveRegionAction::getRegion, HashMultimap::create)); + } + + List getMoveActions() { + return moveActions; + } +} diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionPlanConditional.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionPlanConditional.java new file mode 100644 index 000000000000..1ad91a15f3fb --- /dev/null +++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionPlanConditional.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.master.RegionPlan; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public abstract class RegionPlanConditional { + private static final Logger LOG = LoggerFactory.getLogger(RegionPlanConditional.class); + private BalancerClusterState cluster; + + RegionPlanConditional(Configuration conf, BalancerClusterState cluster) { + this.cluster = cluster; + } + + public enum ValidationLevel { + SERVER, // Just check server + HOST, // Check host and server + RACK // Check rack, host, and server + } + + public ValidationLevel getValidationLevel() { + return ValidationLevel.SERVER; + } + + void refreshClusterState(BalancerClusterState cluster) { + this.cluster = cluster; + } + + /** + * Get the candidate generator(s) for this conditional. This can be useful to provide the balancer + * with hints that will appease your conditional. Your conditionals will be triggered in order. + * @return the candidate generator for this conditional + */ + abstract List getCandidateGenerators(); + + /** + * Check if the conditional is violated by the given region plan. + * @param regionPlan the region plan to check + * @return true if the conditional is violated + */ + boolean isViolating(RegionPlan regionPlan) { + if (regionPlan == null) { + return false; + } + int destinationServerIdx = cluster.serversToIndex.get(regionPlan.getDestination().getAddress()); + + // Check Server + int[] destinationRegionIndices = cluster.regionsPerServer[destinationServerIdx]; + Set serverRegions = new HashSet<>(destinationRegionIndices.length); + for (int regionIdx : destinationRegionIndices) { + serverRegions.add(cluster.regions[regionIdx]); + } + if (isViolatingServer(regionPlan, serverRegions)) { + return true; + } + + if (getValidationLevel() == ValidationLevel.SERVER) { + return false; + } + + // Check Host + int hostIdx = cluster.serverIndexToHostIndex[destinationServerIdx]; + int[] hostRegionIndices = cluster.regionsPerHost[hostIdx]; + Set hostRegions = new HashSet<>(hostRegionIndices.length); + for (int regionIdx : hostRegionIndices) { + hostRegions.add(cluster.regions[regionIdx]); + } + if (isViolatingHost(regionPlan, hostRegions)) { + return true; + } + + if (getValidationLevel() == ValidationLevel.HOST) { + return false; + } + + // Check Rack + int rackIdx = cluster.serverIndexToRackIndex[destinationServerIdx]; + int[] rackRegionIndices = cluster.regionsPerRack[rackIdx]; + Set rackRegions = new HashSet<>(rackRegionIndices.length); + for (int regionIdx : rackRegionIndices) { + rackRegions.add(cluster.regions[regionIdx]); + } + if (isViolatingRack(regionPlan, rackRegions)) { + return true; + } + + return false; + } + + abstract boolean isViolatingServer(RegionPlan regionPlan, Set destinationRegions); + + boolean isViolatingHost(RegionPlan regionPlan, Set destinationRegions) { + return false; + } + + boolean isViolatingRack(RegionPlan regionPlan, Set destinationRegions) { + return false; + } +} diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionPlanConditionalCandidateGenerator.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionPlanConditionalCandidateGenerator.java new file mode 100644 index 000000000000..0642c890d6a3 --- /dev/null +++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionPlanConditionalCandidateGenerator.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import java.time.Duration; +import java.util.List; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public abstract class RegionPlanConditionalCandidateGenerator extends CandidateGenerator { + + private static final Logger LOG = + LoggerFactory.getLogger(RegionPlanConditionalCandidateGenerator.class); + + private static final Duration WEIGHT_CACHE_TTL = Duration.ofMinutes(1); + private long lastWeighedAt = -1; + private double lastWeight = 0.0; + + abstract BalanceAction generateCandidate(BalancerClusterState cluster, boolean isWeighing); + + @Override + BalanceAction generate(BalancerClusterState cluster) { + BalanceAction balanceAction = generateCandidate(cluster, false); + if (!willBeAccepted(cluster, balanceAction)) { + LOG.debug("Generated action is not widely accepted by all conditionals. " + + "Likely we are finding our way out of a deadlock. balanceAction={}", balanceAction); + } + return balanceAction; + } + + MoveBatchAction batchMovesAndResetClusterState(BalancerClusterState cluster, + List moves) { + MoveBatchAction batchAction = new MoveBatchAction(moves); + undoBatchAction(cluster, batchAction); + return batchAction; + } + + boolean willBeAccepted(BalancerClusterState cluster, BalanceAction action) { + return !BalancerConditionals.INSTANCE.isViolating(cluster, action); + } + + void undoBatchAction(BalancerClusterState cluster, MoveBatchAction batchAction) { + for (int i = batchAction.getMoveActions().size() - 1; i >= 0; i--) { + MoveRegionAction action = batchAction.getMoveActions().get(i); + cluster.doAction(action.undoAction()); + } + } + + void clearWeightCache() { + lastWeighedAt = -1; + } + + double getWeight(BalancerClusterState cluster) { + boolean hasCandidate = false; + + // Candidate generation is expensive, so for re-weighing generators we will cache + // the value for a bit + if (System.currentTimeMillis() - lastWeighedAt < WEIGHT_CACHE_TTL.toMillis()) { + return lastWeight; + } else { + hasCandidate = generateCandidate(cluster, true) != BalanceAction.NULL_ACTION; + lastWeighedAt = System.currentTimeMillis(); + } + + if (hasCandidate) { + // If this generator has something to do, then it's important + lastWeight = CandidateGenerator.MAX_WEIGHT; + } else { + lastWeight = 0; + } + return lastWeight; + } +} diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/SlopFixingCandidateGenerator.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/SlopFixingCandidateGenerator.java new file mode 100644 index 000000000000..378758b50047 --- /dev/null +++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/SlopFixingCandidateGenerator.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.hadoop.hbase.ServerName; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A simple candidate generator that attempts to move regions from the most-loaded servers to the + * least-loaded servers. + */ +@InterfaceAudience.Private +final class SlopFixingCandidateGenerator extends RegionPlanConditionalCandidateGenerator { + + private static final Logger LOG = LoggerFactory.getLogger(SlopFixingCandidateGenerator.class); + + private final float slop; + + SlopFixingCandidateGenerator(float slop) { + this.slop = slop; + } + + @Override + BalanceAction generateCandidate(BalancerClusterState cluster, boolean isWeighing) { + ClusterLoadState cs = new ClusterLoadState(cluster.clusterState); + float average = cs.getLoadAverage(); + int ceiling = (int) Math.ceil(average * (1 + slop)); + Set sloppyServerIndices = new HashSet<>(); + for (int i = 0; i < cluster.numServers; i++) { + int regionCount = cluster.regionsPerServer[i].length; + if (regionCount > ceiling) { + sloppyServerIndices.add(i); + } + } + + if (sloppyServerIndices.isEmpty()) { + LOG.trace("No action to take because no sloppy servers exist."); + return BalanceAction.NULL_ACTION; + } + + List moves = new ArrayList<>(); + Set fixedServers = new HashSet<>(); + for (int sourceServer : sloppyServerIndices) { + for (int regionIdx : cluster.regionsPerServer[sourceServer]) { + boolean regionFoundMove = false; + for (ServerAndLoad serverAndLoad : cs.getServersByLoad().keySet()) { + ServerName destinationServer = serverAndLoad.getServerName(); + int destinationServerIdx = cluster.serversToIndex.get(destinationServer.getAddress()); + int regionsOnDestination = cluster.regionsPerServer[destinationServerIdx].length; + if (regionsOnDestination < average) { + MoveRegionAction move = + new MoveRegionAction(regionIdx, sourceServer, destinationServerIdx); + if (willBeAccepted(cluster, move)) { + if (isWeighing) { + // Fast exit for weighing candidate + return move; + } + moves.add(move); + cluster.doAction(move); + regionFoundMove = true; + break; + } + } else { + fixedServers.add(serverAndLoad); + } + } + fixedServers.forEach(s -> cs.getServersByLoad().remove(s)); + fixedServers.clear(); + if (!regionFoundMove) { + LOG.debug("Could not find a destination for region {} from server {}.", regionIdx, + sourceServer); + } + if (cluster.regionsPerServer[sourceServer].length <= ceiling) { + break; + } + } + } + + MoveBatchAction batch = new MoveBatchAction(moves); + undoBatchAction(cluster, batch); + return batch; + } +} diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java index 31d02fa71d6e..b71638cad3c3 100644 --- a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java +++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java @@ -51,7 +51,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.common.base.Suppliers; /** @@ -173,6 +172,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { return shuffled; }, 5, TimeUnit.SECONDS); + private final BalancerConditionals balancerConditionals = BalancerConditionals.INSTANCE; + /** * The constructor that pass a MetricsStochasticBalancer to BaseLoadBalancer to replace its * default MetricsBalancer @@ -226,16 +227,25 @@ Map, CandidateGenerator> getCandidateGenerat protected Map, CandidateGenerator> createCandidateGenerators() { - Map, CandidateGenerator> candidateGenerators = - new HashMap<>(5); - candidateGenerators.put(RandomCandidateGenerator.class, new RandomCandidateGenerator()); - candidateGenerators.put(LoadCandidateGenerator.class, new LoadCandidateGenerator()); - candidateGenerators.put(LocalityBasedCandidateGenerator.class, localityCandidateGenerator); - candidateGenerators.put(RegionReplicaCandidateGenerator.class, - new RegionReplicaCandidateGenerator()); - candidateGenerators.put(RegionReplicaRackCandidateGenerator.class, - new RegionReplicaRackCandidateGenerator()); - return candidateGenerators; + if (balancerConditionals.isReplicaDistributionEnabled()) { + Map, CandidateGenerator> candidateGenerators = + new HashMap<>(3); + candidateGenerators.put(RandomCandidateGenerator.class, new RandomCandidateGenerator()); + candidateGenerators.put(LoadCandidateGenerator.class, new LoadCandidateGenerator()); + candidateGenerators.put(LocalityBasedCandidateGenerator.class, localityCandidateGenerator); + return candidateGenerators; + } else { + Map, CandidateGenerator> candidateGenerators = + new HashMap<>(5); + candidateGenerators.put(RandomCandidateGenerator.class, new RandomCandidateGenerator()); + candidateGenerators.put(LoadCandidateGenerator.class, new LoadCandidateGenerator()); + candidateGenerators.put(LocalityBasedCandidateGenerator.class, localityCandidateGenerator); + candidateGenerators.put(RegionReplicaCandidateGenerator.class, + new RegionReplicaCandidateGenerator()); + candidateGenerators.put(RegionReplicaRackCandidateGenerator.class, + new RegionReplicaRackCandidateGenerator()); + return candidateGenerators; + } } protected List createCostFunctions(Configuration conf) { @@ -270,6 +280,8 @@ protected void loadConf(Configuration conf) { localityCost = new ServerLocalityCostFunction(conf); rackLocalityCost = new RackLocalityCostFunction(conf); + // Order is important here. We need to construct conditionals to load candidate generators + balancerConditionals.setConf(conf); this.candidateGenerators = createCandidateGenerators(); regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf); @@ -343,6 +355,11 @@ void updateMetricsSize(int size) { } private boolean areSomeRegionReplicasColocatedOnHost(BalancerClusterState c) { + if (!c.hasRegionReplicas || balancerConditionals.isReplicaDistributionEnabled()) { + // This check is unnecessary without replicas, or with conditional replica distribution + // The balancer will auto-run if conditional replica distribution candidates are available + return false; + } if (c.numHosts >= c.maxReplicas) { regionReplicaHostCostFunction.prepare(c); double hostCost = Math.abs(regionReplicaHostCostFunction.cost()); @@ -356,6 +373,11 @@ private boolean areSomeRegionReplicasColocatedOnHost(BalancerClusterState c) { } private boolean areSomeRegionReplicasColocatedOnRack(BalancerClusterState c) { + if (!c.hasRegionReplicas || balancerConditionals.isReplicaDistributionEnabled()) { + // This check is unnecessary without replicas, or with conditional replica distribution + // The balancer will auto-run if conditional replica distribution candidates are available + return false; + } if (c.numRacks >= c.maxReplicas) { regionReplicaRackCostFunction.prepare(c); double rackCost = Math.abs(regionReplicaRackCostFunction.cost()); @@ -420,6 +442,11 @@ boolean needsBalance(TableName tableName, BalancerClusterState cluster) { return true; } + if (balancerConditionals.shouldRunBalancer(cluster)) { + LOG.info("Running balancer because conditional candidate generators have important moves"); + return true; + } + double total = 0.0; float localSumMultiplier = 0; // in case this.sumMultiplier is not initialized for (CostFunction c : costFunctions) { @@ -439,14 +466,17 @@ boolean needsBalance(TableName tableName, BalancerClusterState cluster) { costFunctions); LOG.info( "{} - skipping load balancing because weighted average imbalance={} <= " - + "threshold({}). If you want more aggressive balancing, either lower " + + "threshold({}) and conditionals do not have opinionated move candidates. " + + "consecutive balancer runs. If you want more aggressive balancing, either lower " + "hbase.master.balancer.stochastic.minCostNeedBalance from {} or increase the relative " + "multiplier(s) of the specific cost function(s). functionCost={}", isByTable ? "Table specific (" + tableName + ")" : "Cluster wide", total / sumMultiplier, minCostNeedBalance, minCostNeedBalance, functionCost()); } else { - LOG.info("{} - Calculating plan. may take up to {}ms to complete.", - isByTable ? "Table specific (" + tableName + ")" : "Cluster wide", maxRunningTime); + LOG.info( + "{} - Calculating plan. may take up to {}ms to complete. currentCost={}, targetCost={}", + isByTable ? "Table specific (" + tableName + ")" : "Cluster wide", maxRunningTime, total, + minCostNeedBalance); } return !balanced; } @@ -454,7 +484,7 @@ boolean needsBalance(TableName tableName, BalancerClusterState cluster) { @RestrictedApi(explanation = "Should only be called in tests", link = "", allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java") Pair nextAction(BalancerClusterState cluster) { - CandidateGenerator generator = getRandomGenerator(); + CandidateGenerator generator = getRandomGenerator(cluster); return Pair.newPair(generator, generator.generate(cluster)); } @@ -463,8 +493,20 @@ Pair nextAction(BalancerClusterState cluster) * selecting a candidate generator is proportional to the share of cost of all cost functions * among all cost functions that benefit from it. */ - protected CandidateGenerator getRandomGenerator() { - Preconditions.checkState(!candidateGenerators.isEmpty(), "No candidate generators available."); + protected CandidateGenerator getRandomGenerator(BalancerClusterState cluster) { + // Prefer conditional generators if they have moves to make + if (balancerConditionals.isConditionalBalancingEnabled()) { + for (RegionPlanConditional conditional : balancerConditionals.getConditionals()) { + List generators = + conditional.getCandidateGenerators(); + for (RegionPlanConditionalCandidateGenerator generator : generators) { + if (generator.getWeight(cluster) > 0) { + return generator; + } + } + } + } + List> generatorClasses = shuffledGeneratorClasses.get(); List partialSums = new ArrayList<>(generatorClasses.size()); double sum = 0.0; @@ -518,6 +560,7 @@ private long calculateMaxSteps(BalancerClusterState cluster) { * approach the optimal state given enough steps. */ @Override + @SuppressWarnings("checkstyle:MethodLength") protected List balanceTable(TableName tableName, Map> loadOfOneTable) { // On clusters with lots of HFileLinks or lots of reference files, @@ -538,6 +581,8 @@ protected List balanceTable(TableName tableName, long startTime = EnvironmentEdgeManager.currentTime(); initCosts(cluster); + balancerConditionals.loadClusterState(cluster); + balancerConditionals.clearConditionalWeightCaches(); float localSumMultiplier = 0; for (CostFunction c : costFunctions) { @@ -586,6 +631,7 @@ protected List balanceTable(TableName tableName, final String initFunctionTotalCosts = totalCostsPerFunc(); // Perform a stochastic walk to see if we can get a good fit. long step; + boolean planImprovedConditionals = false; Map, Long> generatorToStepCount = new HashMap<>(); Map, Long> generatorToApprovedActionCount = new HashMap<>(); for (step = 0; step < computedMaxSteps; step++) { @@ -597,16 +643,53 @@ protected List balanceTable(TableName tableName, continue; } - cluster.doAction(action); + int conditionalViolationsChange = 0; + boolean isViolatingConditionals = false; + boolean moveImprovedConditionals = false; + // Only check conditionals if they are enabled + if (balancerConditionals.isConditionalBalancingEnabled()) { + // Always accept a conditional generator output. Sometimes conditional generators + // may need to make controversial moves in order to break what would otherwise + // be a deadlocked situation. + // Otherwise, for normal moves, evaluate the action. + if (RegionPlanConditionalCandidateGenerator.class.isAssignableFrom(generator.getClass())) { + conditionalViolationsChange = -1; + } else { + conditionalViolationsChange = + balancerConditionals.getViolationCountChange(cluster, action); + isViolatingConditionals = balancerConditionals.isViolating(cluster, action); + } + moveImprovedConditionals = conditionalViolationsChange < 0; + if (moveImprovedConditionals) { + planImprovedConditionals = true; + } + } + + // Change state and evaluate costs + try { + cluster.doAction(action); + } catch (IllegalStateException | ArrayIndexOutOfBoundsException e) { + LOG.warn( + "Generator {} produced invalid action! " + + "Debug your candidate generator as this is likely a bug, " + + "and may cause a balancer deadlock. {}", + generator.getClass().getSimpleName(), action, e); + continue; + } updateCostsAndWeightsWithAction(cluster, action); - generatorToStepCount.merge(generator.getClass(), 1L, Long::sum); + generatorToStepCount.merge(generator.getClass(), action.getStepCount(), Long::sum); newCost = computeCost(cluster, currentCost); - // Should this be kept? - if (newCost < currentCost) { + boolean conditionalsSimilarCostsImproved = + (newCost < currentCost && conditionalViolationsChange == 0 && !isViolatingConditionals); + // Our first priority is to reduce conditional violations + // Our second priority is to reduce balancer cost + // change, regardless of cost change + if (moveImprovedConditionals || conditionalsSimilarCostsImproved) { currentCost = newCost; - generatorToApprovedActionCount.merge(generator.getClass(), 1L, Long::sum); + generatorToApprovedActionCount.merge(generator.getClass(), action.getStepCount(), + Long::sum); // save for JMX curOverallCost = currentCost; @@ -636,7 +719,7 @@ protected List balanceTable(TableName tableName, metricsBalancer.balanceCluster(endTime - startTime); - if (initCost > currentCost) { + if (planImprovedConditionals || (initCost > currentCost)) { updateStochasticCosts(tableName, curOverallCost, curFunctionCosts); List plans = createRegionPlans(cluster); LOG.info( @@ -651,7 +734,8 @@ protected List balanceTable(TableName tableName, } LOG.info( "Could not find a better moving plan. Tried {} different configurations in " - + "{} ms, and did not find anything with an imbalance score less than {}", + + "{} ms, and did not find anything with an imbalance score less than {} " + + "and could not improve conditional violations", step, endTime - startTime, initCost / sumMultiplier); return null; } diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/replicas/ReplicaKey.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/replicas/ReplicaKey.java new file mode 100644 index 000000000000..c5984881ebf4 --- /dev/null +++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/replicas/ReplicaKey.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer.replicas; + +import java.util.Arrays; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public final class ReplicaKey { + private final Pair startAndStopKeys; + + public ReplicaKey(RegionInfo regionInfo) { + this.startAndStopKeys = new Pair<>(new ByteArrayWrapper(regionInfo.getStartKey()), + new ByteArrayWrapper(regionInfo.getEndKey())); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof ReplicaKey)) { + return false; + } + ReplicaKey other = (ReplicaKey) o; + return this.startAndStopKeys.equals(other.startAndStopKeys); + } + + @Override + public int hashCode() { + return startAndStopKeys.hashCode(); + } + + static class ByteArrayWrapper { + private final byte[] bytes; + + ByteArrayWrapper(byte[] prefix) { + this.bytes = Arrays.copyOf(prefix, prefix.length); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof ByteArrayWrapper)) { + return false; + } + ByteArrayWrapper other = (ByteArrayWrapper) o; + return Arrays.equals(this.bytes, other.bytes); + } + + @Override + public int hashCode() { + return Arrays.hashCode(bytes); + } + } +} diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/replicas/ReplicaKeyCache.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/replicas/ReplicaKeyCache.java new file mode 100644 index 000000000000..0ef8c122ff8a --- /dev/null +++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/replicas/ReplicaKeyCache.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer.replicas; + +import java.time.Duration; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; +import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader; +import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache; + +@InterfaceAudience.Private +public final class ReplicaKeyCache implements Configurable { + public static final ReplicaKeyCache INSTANCE = new ReplicaKeyCache(); + + /** + * ReplicaKey creation is expensive if you have lots of regions. If your HMaster has adequate + * memory, and you would like balancing to be faster, then you can turn on this flag to cache + * ReplicaKey objects. + */ + public static final String CACHE_REPLICA_KEYS_KEY = + "hbase.replica.distribution.conditional.cacheReplicaKeys"; + public static final boolean CACHE_REPLICA_KEYS_DEFAULT = false; + + /** + * If memory is available, then set this to a value greater than your region count to maximize + * replica distribution performance. + */ + public static final String REPLICA_KEY_CACHE_SIZE_KEY = + "hbase.replica.distribution.conditional.replicaKeyCacheSize"; + public static final int REPLICA_KEY_CACHE_SIZE_DEFAULT = 1000; + + private volatile LoadingCache replicaKeyCache = null; + + private ReplicaKeyCache() { + } + + public ReplicaKey getReplicaKey(RegionInfo regionInfo) { + if (replicaKeyCache == null) { + return new ReplicaKey(regionInfo); + } else { + return replicaKeyCache.getUnchecked(regionInfo); + } + } + + @Override + public void setConf(Configuration conf) { + boolean cacheKeys = conf.getBoolean(CACHE_REPLICA_KEYS_KEY, CACHE_REPLICA_KEYS_DEFAULT); + if (cacheKeys && replicaKeyCache == null) { + int replicaKeyCacheSize = + conf.getInt(REPLICA_KEY_CACHE_SIZE_KEY, REPLICA_KEY_CACHE_SIZE_DEFAULT); + replicaKeyCache = CacheBuilder.newBuilder().maximumSize(replicaKeyCacheSize) + .expireAfterAccess(Duration.ofMinutes(30)).build(new CacheLoader() { + @Override + public ReplicaKey load(RegionInfo regionInfo) { + return new ReplicaKey(regionInfo); + } + }); + } else if (!cacheKeys && replicaKeyCache != null) { + replicaKeyCache = null; + } + } + + @Override + public Configuration getConf() { + return null; + } +} diff --git a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/CandidateGeneratorTestUtil.java b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/CandidateGeneratorTestUtil.java new file mode 100644 index 000000000000..116ee4fc6574 --- /dev/null +++ b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/CandidateGeneratorTestUtil.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import static org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer.MAX_RUNNING_TIME_KEY; +import static org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer.MIN_COST_NEED_BALANCE_KEY; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Base64; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.master.RegionPlan; +import org.apache.hadoop.hbase.master.balancer.replicas.ReplicaKey; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class CandidateGeneratorTestUtil { + + private static final Logger LOG = LoggerFactory.getLogger(CandidateGeneratorTestUtil.class); + + private CandidateGeneratorTestUtil() { + } + + static void runBalancerToExhaustion(Configuration conf, + Map> serverToRegions, + Set> expectations, float targetMaxBalancerCost) { + // Do the full plan. We're testing with a lot of regions + conf.setBoolean("hbase.master.balancer.stochastic.runMaxSteps", true); + conf.setLong(MAX_RUNNING_TIME_KEY, 15000); + + conf.setFloat(MIN_COST_NEED_BALANCE_KEY, targetMaxBalancerCost); + + BalancerClusterState cluster = createMockBalancerClusterState(serverToRegions); + StochasticLoadBalancer stochasticLoadBalancer = buildStochasticLoadBalancer(cluster, conf); + printClusterDistribution(cluster, 0); + int balancerRuns = 0; + int actionsTaken = 0; + long balancingMillis = 0; + boolean isBalanced = false; + while (!isBalanced) { + balancerRuns++; + if (balancerRuns > 1000) { + throw new RuntimeException("Balancer failed to find balance & meet expectations"); + } + long start = System.currentTimeMillis(); + List regionPlans = + stochasticLoadBalancer.balanceCluster(partitionRegionsByTable(serverToRegions)); + balancingMillis += System.currentTimeMillis() - start; + actionsTaken++; + if (regionPlans != null) { + // Apply all plans to serverToRegions + for (RegionPlan rp : regionPlans) { + ServerName source = rp.getSource(); + ServerName dest = rp.getDestination(); + RegionInfo region = rp.getRegionInfo(); + + // Update serverToRegions + serverToRegions.get(source).remove(region); + serverToRegions.get(dest).add(region); + actionsTaken++; + } + + // Now rebuild cluster and balancer from updated serverToRegions + cluster = createMockBalancerClusterState(serverToRegions); + stochasticLoadBalancer = buildStochasticLoadBalancer(cluster, conf); + } + printClusterDistribution(cluster, actionsTaken); + isBalanced = true; + for (Function condition : expectations) { + // Check if we've met all expectations for the candidate generator + if (!condition.apply(cluster)) { + isBalanced = false; + break; + } + } + if (isBalanced) { // Check if the balancer thinks we're done too + LOG.info("All balancer conditions passed. Checking if balancer thinks it's done."); + if (stochasticLoadBalancer.needsBalance(HConstants.ENSEMBLE_TABLE_NAME, cluster)) { + LOG.info("Balancer would still like to run"); + isBalanced = false; + } else { + LOG.info("Balancer is done"); + } + } + } + LOG.info("Balancing took {}sec", Duration.ofMillis(balancingMillis).toMinutes()); + } + + /** + * Prints the current cluster distribution of regions per table per server + */ + static void printClusterDistribution(BalancerClusterState cluster, long actionsTaken) { + LOG.info("=== Cluster Distribution after {} balancer actions taken ===", actionsTaken); + + for (int i = 0; i < cluster.numServers; i++) { + int[] regions = cluster.regionsPerServer[i]; + int regionCount = (regions == null) ? 0 : regions.length; + + LOG.info("Server {}: {} regions", cluster.servers[i].getServerName(), regionCount); + + if (regionCount > 0) { + Map tableRegionCounts = new HashMap<>(); + + for (int regionIndex : regions) { + RegionInfo regionInfo = cluster.regions[regionIndex]; + TableName tableName = regionInfo.getTable(); + tableRegionCounts.put(tableName, tableRegionCounts.getOrDefault(tableName, 0) + 1); + } + + tableRegionCounts + .forEach((table, count) -> LOG.info(" - Table {}: {} regions", table, count)); + } + } + + LOG.info("==========================================="); + } + + /** + * Partitions the given serverToRegions map by table The tables are derived from the RegionInfo + * objects found in serverToRegions. + * @param serverToRegions The map of servers to their assigned regions. + * @return A map of tables to their server-to-region assignments. + */ + public static Map>> + partitionRegionsByTable(Map> serverToRegions) { + + // First, gather all tables from the regions + Set allTables = new HashSet<>(); + for (List regions : serverToRegions.values()) { + for (RegionInfo region : regions) { + allTables.add(region.getTable()); + } + } + + Map>> tablesToServersToRegions = new HashMap<>(); + + // Initialize each table with all servers mapped to empty lists + for (TableName table : allTables) { + Map> serverMap = new HashMap<>(); + for (ServerName server : serverToRegions.keySet()) { + serverMap.put(server, new ArrayList<>()); + } + tablesToServersToRegions.put(table, serverMap); + } + + // Distribute regions to their respective tables + for (Map.Entry> serverAndRegions : serverToRegions.entrySet()) { + ServerName server = serverAndRegions.getKey(); + List regions = serverAndRegions.getValue(); + + for (RegionInfo region : regions) { + TableName regionTable = region.getTable(); + // Now we know for sure regionTable is in allTables + Map> tableServerMap = + tablesToServersToRegions.get(regionTable); + tableServerMap.get(server).add(region); + } + } + + return tablesToServersToRegions; + } + + static StochasticLoadBalancer buildStochasticLoadBalancer(BalancerClusterState cluster, + Configuration conf) { + StochasticLoadBalancer stochasticLoadBalancer = + new StochasticLoadBalancer(new DummyMetricsStochasticBalancer()); + stochasticLoadBalancer.setClusterInfoProvider(new DummyClusterInfoProvider(conf)); + stochasticLoadBalancer.loadConf(conf); + stochasticLoadBalancer.initCosts(cluster); + return stochasticLoadBalancer; + } + + static BalancerClusterState + createMockBalancerClusterState(Map> serverToRegions) { + return new BalancerClusterState(serverToRegions, null, null, null, null); + } + + /** + * Validates that each replica is isolated from its others. Ensures that no server hosts more than + * one replica of the same region (i.e., regions with identical start and end keys). + * @param cluster The current state of the cluster. + * @return true if all replicas are properly isolated, false otherwise. + */ + static boolean areAllReplicasDistributed(BalancerClusterState cluster) { + // Iterate over each server + for (int[] regionsPerServer : cluster.regionsPerServer) { + if (regionsPerServer == null || regionsPerServer.length == 0) { + continue; // Skip empty servers + } + + Set foundKeys = new HashSet<>(); + for (int regionIndex : regionsPerServer) { + RegionInfo regionInfo = cluster.regions[regionIndex]; + ReplicaKey replicaKey = new ReplicaKey(regionInfo); + if (foundKeys.contains(replicaKey)) { + // Violation: Multiple replicas of the same region on the same server + LOG.warn("Replica isolation violated: one server hosts multiple replicas of key [{}].", + generateRegionKey(regionInfo)); + return false; + } + + foundKeys.add(replicaKey); + } + } + + LOG.info( + "Replica isolation validation passed: No server hosts multiple replicas of the same region."); + return true; + } + + /** + * Generates a unique key for a region based on its start and end keys. This method ensures that + * regions with identical start and end keys have the same key. + * @param regionInfo The RegionInfo object. + * @return A string representing the unique key of the region. + */ + private static String generateRegionKey(RegionInfo regionInfo) { + // Using Base64 encoding for byte arrays to ensure uniqueness and readability + String startKey = Base64.getEncoder().encodeToString(regionInfo.getStartKey()); + String endKey = Base64.getEncoder().encodeToString(regionInfo.getEndKey()); + + return regionInfo.getTable().getNameAsString() + ":" + startKey + ":" + endKey; + } + +} diff --git a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBalancerConditionals.java b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBalancerConditionals.java new file mode 100644 index 000000000000..949c261cb19d --- /dev/null +++ b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBalancerConditionals.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestBalancerConditionals extends BalancerTestBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestBalancerConditionals.class); + + private BalancerConditionals balancerConditionals; + private BalancerClusterState mockCluster; + + @Before + public void setUp() { + balancerConditionals = BalancerConditionals.INSTANCE; + mockCluster = mockCluster(new int[] { 0, 1, 2 }); + } + + @Test + public void testDefaultConfiguration() { + Configuration conf = new Configuration(); + balancerConditionals.setConf(conf); + balancerConditionals.loadClusterState(mockCluster); + + assertEquals("No conditionals should be loaded by default", 0, + balancerConditionals.getConditionalClasses().size()); + } + + @Test + public void testCustomConditionalsViaConfiguration() { + Configuration conf = new Configuration(); + conf.set(BalancerConditionals.ADDITIONAL_CONDITIONALS_KEY, + DistributeReplicasConditional.class.getName()); + + balancerConditionals.setConf(conf); + balancerConditionals.loadClusterState(mockCluster); + + assertTrue("Custom conditionals should be loaded", + balancerConditionals.shouldSkipSloppyServerEvaluation()); + } + + @Test + public void testInvalidCustomConditionalClass() { + Configuration conf = new Configuration(); + conf.set(BalancerConditionals.ADDITIONAL_CONDITIONALS_KEY, "java.lang.String"); + + balancerConditionals.setConf(conf); + balancerConditionals.loadClusterState(mockCluster); + + assertEquals("Invalid classes should not be loaded as conditionals", 0, + balancerConditionals.getConditionalClasses().size()); + } + +} diff --git a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestLargeClusterBalancingConditionalReplicaDistribution.java b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestLargeClusterBalancingConditionalReplicaDistribution.java new file mode 100644 index 000000000000..946a343256ff --- /dev/null +++ b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestLargeClusterBalancingConditionalReplicaDistribution.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import static org.apache.hadoop.hbase.master.balancer.CandidateGeneratorTestUtil.runBalancerToExhaustion; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.master.balancer.replicas.ReplicaKeyCache; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category(MediumTests.class) +public class TestLargeClusterBalancingConditionalReplicaDistribution { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestLargeClusterBalancingConditionalReplicaDistribution.class); + + private static final Logger LOG = + LoggerFactory.getLogger(TestLargeClusterBalancingConditionalReplicaDistribution.class); + + private static final int NUM_SERVERS = 1000; + private static final int NUM_REGIONS = 20_000; + private static final int NUM_REPLICAS = 3; + private static final int NUM_TABLES = 100; + + private static final ServerName[] servers = new ServerName[NUM_SERVERS]; + private static final Map> serverToRegions = new HashMap<>(); + + @BeforeClass + public static void setup() { + // Initialize servers + for (int i = 0; i < NUM_SERVERS; i++) { + servers[i] = ServerName.valueOf("server" + i, i, System.currentTimeMillis()); + serverToRegions.put(servers[i], new ArrayList<>()); + } + + // Create primary regions and their replicas + List allRegions = new ArrayList<>(); + for (int i = 0; i < NUM_REGIONS; i++) { + TableName tableName = getTableName(i); + // Define startKey and endKey for the region + byte[] startKey = Bytes.toBytes(i); + byte[] endKey = Bytes.toBytes(i + 1); + + // Create 3 replicas for each primary region + for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) { + RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey) + .setEndKey(endKey).setReplicaId(replicaId).build(); + allRegions.add(regionInfo); + } + } + + // Assign all regions to one server + for (RegionInfo regionInfo : allRegions) { + serverToRegions.get(servers[0]).add(regionInfo); + } + } + + private static TableName getTableName(int i) { + return TableName.valueOf("userTable" + i % NUM_TABLES); + } + + @Test + public void testReplicaDistribution() { + Configuration conf = new Configuration(false); + conf.setBoolean(BalancerConditionals.DISTRIBUTE_REPLICAS_KEY, true); + conf.setBoolean(DistributeReplicasConditional.TEST_MODE_ENABLED_KEY, true); + conf.setBoolean(ReplicaKeyCache.CACHE_REPLICA_KEYS_KEY, true); + conf.setInt(ReplicaKeyCache.REPLICA_KEY_CACHE_SIZE_KEY, Integer.MAX_VALUE); + conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 30_000); + + // turn off replica cost functions + conf.setLong("hbase.master.balancer.stochastic.regionReplicaRackCostKey", 0); + conf.setLong("hbase.master.balancer.stochastic.regionReplicaHostCostKey", 0); + + runBalancerToExhaustion(conf, serverToRegions, + Set.of(CandidateGeneratorTestUtil::areAllReplicasDistributed), 10.0f); + LOG.info("Meta table and system table regions are successfully isolated, " + + "meanwhile region replicas are appropriately distributed across RegionServers."); + } +} diff --git a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerHeterogeneousCost.java b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerHeterogeneousCost.java index a565c14f0fda..5909bfb7ff38 100644 --- a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerHeterogeneousCost.java +++ b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerHeterogeneousCost.java @@ -245,7 +245,7 @@ static class StochasticLoadTestBalancer extends StochasticLoadBalancer { } @Override - protected CandidateGenerator getRandomGenerator() { + protected CandidateGenerator getRandomGenerator(BalancerClusterState cluster) { return fairRandomCandidateGenerator; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerConditionalsTestUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerConditionalsTestUtil.java new file mode 100644 index 000000000000..8a7169b09309 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerConditionalsTestUtil.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.quotas.QuotaUtil; +import org.apache.hadoop.hbase.util.Bytes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; + +public final class BalancerConditionalsTestUtil { + + private static final Logger LOG = LoggerFactory.getLogger(BalancerConditionalsTestUtil.class); + + private BalancerConditionalsTestUtil() { + } + + static byte[][] generateSplits(int numRegions) { + byte[][] splitKeys = new byte[numRegions - 1][]; + for (int i = 0; i < numRegions - 1; i++) { + splitKeys[i] = + Bytes.toBytes(String.format("%09d", (i + 1) * (Integer.MAX_VALUE / numRegions))); + } + return splitKeys; + } + + static void printRegionLocations(Connection connection) throws IOException { + Admin admin = connection.getAdmin(); + + // Get all table names in the cluster + Set tableNames = admin.listTableDescriptors(true).stream() + .map(TableDescriptor::getTableName).collect(Collectors.toSet()); + + // Group regions by server + Map>> serverToRegions = + admin.getClusterMetrics().getLiveServerMetrics().keySet().stream() + .collect(Collectors.toMap(server -> server, server -> { + try { + return listRegionsByTable(connection, server, tableNames); + } catch (IOException e) { + throw new RuntimeException(e); + } + })); + + // Pretty print region locations + StringBuilder regionLocationOutput = new StringBuilder(); + regionLocationOutput.append("Pretty printing region locations...\n"); + serverToRegions.forEach((server, tableRegions) -> { + regionLocationOutput.append("Server: " + server.getServerName() + "\n"); + tableRegions.forEach((table, regions) -> { + if (regions.isEmpty()) { + return; + } + regionLocationOutput.append(" Table: " + table.getNameAsString() + "\n"); + regions.forEach(region -> regionLocationOutput + .append(String.format(" Region: %s, start: %s, end: %s, replica: %s\n", + region.getEncodedName(), Bytes.toString(region.getStartKey()), + Bytes.toString(region.getEndKey()), region.getReplicaId()))); + }); + }); + LOG.info(regionLocationOutput.toString()); + } + + private static Map> listRegionsByTable(Connection connection, + ServerName server, Set tableNames) throws IOException { + Admin admin = connection.getAdmin(); + + // Find regions for each table + return tableNames.stream().collect(Collectors.toMap(tableName -> tableName, tableName -> { + List allRegions = null; + try { + allRegions = admin.getRegions(server); + } catch (IOException e) { + throw new RuntimeException(e); + } + return allRegions.stream().filter(region -> region.getTable().equals(tableName)) + .collect(Collectors.toList()); + })); + } + + static void validateReplicaDistribution(Connection connection, TableName tableName, + boolean shouldBeDistributed) { + Map> serverToRegions = null; + try { + serverToRegions = connection.getRegionLocator(tableName).getAllRegionLocations().stream() + .collect(Collectors.groupingBy(location -> location.getServerName(), + Collectors.mapping(location -> location.getRegion(), Collectors.toList()))); + } catch (IOException e) { + throw new RuntimeException(e); + } + + if (shouldBeDistributed) { + // Ensure no server hosts more than one replica of any region + for (Map.Entry> serverAndRegions : serverToRegions.entrySet()) { + List regionInfos = serverAndRegions.getValue(); + Set startKeys = new HashSet<>(); + for (RegionInfo regionInfo : regionInfos) { + // each region should have a distinct start key + assertFalse( + "Each region should have its own start key, " + + "demonstrating it is not a replica of any others on this host", + startKeys.contains(regionInfo.getStartKey())); + startKeys.add(regionInfo.getStartKey()); + } + } + } else { + // Ensure all replicas are on the same server + assertEquals("All regions should share one server", 1, serverToRegions.size()); + } + } + + static void validateRegionLocations(Map> tableToServers, + TableName productTableName, boolean shouldBeBalanced) { + ServerName metaServer = + tableToServers.get(TableName.META_TABLE_NAME).stream().findFirst().orElseThrow(); + ServerName quotaServer = + tableToServers.get(QuotaUtil.QUOTA_TABLE_NAME).stream().findFirst().orElseThrow(); + Set productServers = tableToServers.get(productTableName); + + if (shouldBeBalanced) { + for (ServerName server : productServers) { + assertNotEquals("Meta table and product table should not share servers", server, + metaServer); + assertNotEquals("Quota table and product table should not share servers", server, + quotaServer); + } + assertNotEquals("The meta server and quotas server should be different", metaServer, + quotaServer); + } else { + for (ServerName server : productServers) { + assertEquals("Meta table and product table must share servers", server, metaServer); + assertEquals("Quota table and product table must share servers", server, quotaServer); + } + assertEquals("The meta server and quotas server must be the same", metaServer, quotaServer); + } + } + + static Map> getTableToServers(Connection connection, + Set tableNames) { + return tableNames.stream().collect(Collectors.toMap(t -> t, t -> { + try { + return connection.getRegionLocator(t).getAllRegionLocations().stream() + .map(HRegionLocation::getServerName).collect(Collectors.toSet()); + } catch (IOException e) { + throw new RuntimeException(e); + } + })); + } + + @FunctionalInterface + interface AssertionRunnable { + void run() throws AssertionError; + } + + static void validateAssertionsWithRetries(HBaseTestingUtil testUtil, boolean runBalancerOnFailure, + AssertionRunnable assertion) { + validateAssertionsWithRetries(testUtil, runBalancerOnFailure, ImmutableSet.of(assertion)); + } + + static void validateAssertionsWithRetries(HBaseTestingUtil testUtil, boolean runBalancerOnFailure, + Set assertions) { + int maxAttempts = 50; + for (int i = 0; i < maxAttempts; i++) { + try { + for (AssertionRunnable assertion : assertions) { + assertion.run(); + } + } catch (AssertionError e) { + if (i == maxAttempts - 1) { + throw e; + } + try { + LOG.warn("Failed to validate region locations. Will retry", e); + Thread.sleep(1000); + BalancerConditionalsTestUtil.printRegionLocations(testUtil.getConnection()); + if (runBalancerOnFailure) { + testUtil.getAdmin().balance(); + } + Thread.sleep(1000); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + } + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestReplicaDistributionBalancerConditional.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestReplicaDistributionBalancerConditional.java new file mode 100644 index 000000000000..e60ed8f5957b --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestReplicaDistributionBalancerConditional.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import static org.apache.hadoop.hbase.master.balancer.BalancerConditionalsTestUtil.validateAssertionsWithRetries; + +import java.util.List; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category(LargeTests.class) +public class TestReplicaDistributionBalancerConditional { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestReplicaDistributionBalancerConditional.class); + + private static final Logger LOG = + LoggerFactory.getLogger(TestReplicaDistributionBalancerConditional.class); + private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + private static final int REPLICAS = 3; + private static final int NUM_SERVERS = REPLICAS; + private static final int REGIONS_PER_SERVER = 5; + + @Before + public void setUp() throws Exception { + TEST_UTIL.getConfiguration().setBoolean(BalancerConditionals.DISTRIBUTE_REPLICAS_KEY, true); + TEST_UTIL.getConfiguration().setBoolean(DistributeReplicasConditional.TEST_MODE_ENABLED_KEY, + true); + TEST_UTIL.getConfiguration() + .setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true); + TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_BALANCER_PERIOD, 1000L); + TEST_UTIL.getConfiguration().setBoolean("hbase.master.balancer.stochastic.runMaxSteps", true); + + // turn off replica cost functions + TEST_UTIL.getConfiguration() + .setLong("hbase.master.balancer.stochastic.regionReplicaRackCostKey", 0); + TEST_UTIL.getConfiguration() + .setLong("hbase.master.balancer.stochastic.regionReplicaHostCostKey", 0); + + TEST_UTIL.startMiniCluster(NUM_SERVERS); + } + + @After + public void tearDown() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testReplicaDistribution() throws Exception { + Connection connection = TEST_UTIL.getConnection(); + Admin admin = connection.getAdmin(); + + // Create a "replicated_table" with region replicas + TableName replicatedTableName = TableName.valueOf("replicated_table"); + TableDescriptor replicatedTableDescriptor = + TableDescriptorBuilder.newBuilder(replicatedTableName) + .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("0")).build()) + .setRegionReplication(REPLICAS).build(); + admin.createTable(replicatedTableDescriptor, + BalancerConditionalsTestUtil.generateSplits(REGIONS_PER_SERVER * NUM_SERVERS)); + + // Pause the balancer + admin.balancerSwitch(false, true); + + // Collect all region replicas and place them on one RegionServer + List allRegions = admin.getRegions(replicatedTableName); + String targetServer = + TEST_UTIL.getHBaseCluster().getRegionServer(0).getServerName().getServerName(); + + for (RegionInfo region : allRegions) { + admin.move(region.getEncodedNameAsBytes(), Bytes.toBytes(targetServer)); + } + + BalancerConditionalsTestUtil.printRegionLocations(TEST_UTIL.getConnection()); + validateAssertionsWithRetries(TEST_UTIL, false, () -> BalancerConditionalsTestUtil + .validateReplicaDistribution(connection, replicatedTableName, false)); + + // Unpause the balancer and trigger balancing + admin.balancerSwitch(true, true); + admin.balance(); + + validateAssertionsWithRetries(TEST_UTIL, true, () -> BalancerConditionalsTestUtil + .validateReplicaDistribution(connection, replicatedTableName, true)); + BalancerConditionalsTestUtil.printRegionLocations(TEST_UTIL.getConnection()); + } +}