diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index aaab713112beb..aeb0f3d56eed6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -801,9 +801,18 @@ private Resource getCurrentLimitResource(String nodePartition, return Resources.min(resourceCalculator, clusterResource, queueMaxResource, currentResourceLimits.getLimit()); } else if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) { - // When we doing non-exclusive resource allocation, maximum capacity of - // all queues on this label equals to total resource with the label. - return labelManager.getResourceByLabel(nodePartition, clusterResource); + CapacitySchedulerConfiguration conf = queueContext.getConfiguration(); + boolean queueLimitEnable = conf.getBoolean( + CapacitySchedulerConfiguration.NON_EXCLUSIVE_LABEL_QUEUE_LIMIT_ENABLE, + CapacitySchedulerConfiguration.DEFAULT_NON_EXCLUSIVE_LABEL_QUEUE_LIMIT_ENABLE + ); + if (queueLimitEnable) { + return getQueueMaxResource(nodePartition); + } else { + // When we doing non-exclusive resource allocation, maximum capacity of + // all queues on this label equals to total resource with the label. + return labelManager.getResourceByLabel(nodePartition, clusterResource); + } } return Resources.none(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index ea5c892ce3e5b..658e58694a23f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -301,6 +301,13 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur @Private public static final boolean DEFAULT_SCHEDULE_ASYNCHRONOUSLY_ENABLE = true; + @Private + public static final String NON_EXCLUSIVE_LABEL_QUEUE_LIMIT_ENABLE = + PREFIX + "non-exclusive-label.queue-limit-enable"; + + @Private + public static final boolean DEFAULT_NON_EXCLUSIVE_LABEL_QUEUE_LIMIT_ENABLE = false; + @Private public static final String QUEUE_MAPPING = PREFIX + "queue-mappings"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java index 811b5482cbfbe..55f4cc26260a2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java @@ -1822,7 +1822,97 @@ public RMNodeLabelsManager createNodeLabelManager() { rm1.close(); } - + + @Test + @Timeout(value = 60) + public void + testQueueMaxCapacitiesWillBeHonoredWhenNotRespectingExclusivity() + throws Exception { + + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(this.conf); + + csConf.setBoolean(CapacitySchedulerConfiguration.NON_EXCLUSIVE_LABEL_QUEUE_LIMIT_ENABLE, true); + + // Define top-level queues + csConf.setQueues(ROOT, new String[] {"a", "b"}); + csConf.setCapacityByLabel(ROOT, "x", 100); + + csConf.setCapacity(A, 50); + csConf.setAccessibleNodeLabels(A, toSet("x")); + csConf.setCapacityByLabel(A, "x", 50); + csConf.setMaximumCapacityByLabel(A, "x", 50); + csConf.setUserLimit(A, 200); + + csConf.setCapacity(B, 50); + csConf.setAccessibleNodeLabels(B, toSet("x")); + csConf.setCapacityByLabel(B, "x", 50); + csConf.setMaximumCapacityByLabel(B, "x", 50); + csConf.setUserLimit(B, 200); + + // set node -> label + mgr.addToCluserNodeLabels(ImmutableSet.of( + NodeLabel.newInstance("x", false))); + mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"))); + + // inject node label manager + MockRM rm1 = new MockRM(csConf) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x + MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); // label = + + // app1 -> a + MockRMAppSubmissionData data = + MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm1) + .withAppName("app") + .withUser("user") + .withAcls(null) + .withQueue("a") + .withUnmanagedAM(false) + .build(); + RMApp app1 = MockRMAppSubmitter.submit(rm1, data); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2); + + // app1 asks for 10 partition= containers + am1.allocate("*", 1 * GB, 10, new ArrayList()); + + // NM1 do 50 heartbeats + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + + SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId()); + + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + for (int i = 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + } + + assertEquals(5, schedulerNode1.getNumContainers()); + + // check non-exclusive containers of LeafQueue is correctly updated + LeafQueue leafQueue = (LeafQueue) cs.getQueue("a"); + assertFalse(leafQueue.getIgnoreExclusivityRMContainers().containsKey( + "y")); + assertEquals(5, + leafQueue.getIgnoreExclusivityRMContainers().get("x").size()); + + // completes all containers of app1, ignoreExclusivityRMContainers should be + // updated as well. + cs.handle(new AppAttemptRemovedSchedulerEvent( + am1.getApplicationAttemptId(), RMAppAttemptState.FINISHED, false)); + assertFalse(leafQueue.getIgnoreExclusivityRMContainers().containsKey( + "x")); + + rm1.close(); + } + private void checkQueueUsedCapacity(String queueName, CapacityScheduler cs, String nodePartition, float usedCapacity, float absoluteUsedCapacity) { float epsilon = 1e-6f;