Skip to content

Commit 55e5e15

Browse files
committed
Changes to set the memory allocation per operator in query profile.
Addressing an memory minimization logic was not considering non-buffered operators. Handling error cases when memory requirements for buffered or non-buffered cannot be reduced.
1 parent f4bc8bf commit 55e5e15

25 files changed

+348
-163
lines changed

exec/java-exec/src/main/java/org/apache/drill/exec/ops/OpProfileDef.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,13 @@ public class OpProfileDef {
2222
public int operatorId;
2323
public int operatorType;
2424
public int incomingCount;
25+
public long optimalMemoryAllocation;
2526

26-
public OpProfileDef(int operatorId, int operatorType, int incomingCount) {
27+
public OpProfileDef(int operatorId, int operatorType, int incomingCount, long optimalMemoryAllocation) {
2728
this.operatorId = operatorId;
2829
this.operatorType = operatorType;
2930
this.incomingCount = incomingCount;
31+
this.optimalMemoryAllocation = optimalMemoryAllocation;
3032
}
3133
public int getOperatorId(){
3234
return operatorId;

exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public OperatorContextImpl(PhysicalOperator popConfig, FragmentContextImpl conte
5757
} else {
5858
OpProfileDef def =
5959
new OpProfileDef(popConfig.getOperatorId(), popConfig.getOperatorType(),
60-
OperatorUtilities.getChildCount(popConfig));
60+
OperatorUtilities.getChildCount(popConfig), popConfig.getMaxAllocation());
6161
this.stats = context.getStats().newOperatorStats(def, allocator);
6262
}
6363
}

exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public class OperatorStats {
4545
public long[] recordsReceivedByInput;
4646
public long[] batchesReceivedByInput;
4747
private long[] schemaCountByInput;
48-
48+
private long optimalMemoryAllocation;
4949

5050
private boolean inProcessing = false;
5151
private boolean inSetup = false;
@@ -62,7 +62,7 @@ public class OperatorStats {
6262
private int inputCount;
6363

6464
public OperatorStats(OpProfileDef def, BufferAllocator allocator){
65-
this(def.getOperatorId(), def.getOperatorType(), def.getIncomingCount(), allocator);
65+
this(def.getOperatorId(), def.getOperatorType(), def.getIncomingCount(), allocator, def.optimalMemoryAllocation);
6666
}
6767

6868
/**
@@ -74,7 +74,7 @@ public OperatorStats(OpProfileDef def, BufferAllocator allocator){
7474
*/
7575

7676
public OperatorStats(OperatorStats original, boolean isClean) {
77-
this(original.operatorId, original.operatorType, original.inputCount, original.allocator);
77+
this(original.operatorId, original.operatorType, original.inputCount, original.allocator, original.optimalMemoryAllocation);
7878

7979
if ( !isClean ) {
8080
inProcessing = original.inProcessing;
@@ -88,7 +88,7 @@ public OperatorStats(OperatorStats original, boolean isClean) {
8888
}
8989

9090
@VisibleForTesting
91-
public OperatorStats(int operatorId, int operatorType, int inputCount, BufferAllocator allocator) {
91+
public OperatorStats(int operatorId, int operatorType, int inputCount, BufferAllocator allocator, long initialAllocation) {
9292
super();
9393
this.allocator = allocator;
9494
this.operatorId = operatorId;
@@ -97,6 +97,7 @@ public OperatorStats(int operatorId, int operatorType, int inputCount, BufferAll
9797
this.recordsReceivedByInput = new long[inputCount];
9898
this.batchesReceivedByInput = new long[inputCount];
9999
this.schemaCountByInput = new long[inputCount];
100+
this.optimalMemoryAllocation = initialAllocation;
100101
}
101102

102103
private String assertionError(String msg){
@@ -207,6 +208,7 @@ public OperatorProfile getProfile() {
207208
.setOperatorId(operatorId) //
208209
.setSetupNanos(setupNanos) //
209210
.setProcessNanos(processingNanos)
211+
.setOptimalMemAllocation(optimalMemoryAllocation)
210212
.setWaitNanos(waitNanos);
211213

212214
if (allocator != null) {

exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -101,18 +101,6 @@ public boolean enforceWidth() {
101101
return getMinParallelizationWidth() > 1;
102102
}
103103

104-
@Override
105-
@JsonIgnore
106-
public long getInitialAllocation() {
107-
return 0;
108-
}
109-
110-
@Override
111-
@JsonIgnore
112-
public long getMaxAllocation() {
113-
return 0;
114-
}
115-
116104
@Override
117105
@JsonIgnore
118106
public boolean canPushdownProjects(List<SchemaPath> columns) {

exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public BaseRootExec(final RootFragmentContext fragmentContext, final OperatorCon
5454
}
5555
//Creating new stat for appending to list
5656
stats = new OperatorStats(new OpProfileDef(config.getOperatorId(),
57-
config.getOperatorType(), OperatorUtilities.getChildCount(config)),
57+
config.getOperatorType(), OperatorUtilities.getChildCount(config), config.getMaxAllocation()),
5858
this.oContext.getAllocator());
5959
fragmentContext.getStats().addOperatorStats(this.stats);
6060
this.fragmentContext = fragmentContext;

exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/DistributedQueueParallelizer.java

Lines changed: 84 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.drill.exec.planner.fragment;
1919

20+
import com.fasterxml.jackson.core.JsonProcessingException;
2021
import org.apache.commons.lang3.tuple.Pair;
2122
import org.apache.drill.common.exceptions.ExecutionSetupException;
2223
import org.apache.drill.common.util.function.CheckedConsumer;
@@ -29,7 +30,7 @@
2930
import org.apache.drill.exec.resourcemgr.config.QueryQueueConfig;
3031
import org.apache.drill.exec.resourcemgr.config.exception.QueueSelectionException;
3132
import org.apache.drill.exec.work.foreman.rm.QueryResourceManager;
32-
33+
import com.fasterxml.jackson.databind.ObjectMapper;
3334
import java.util.ArrayList;
3435
import java.util.Collection;
3536
import java.util.HashMap;
@@ -46,6 +47,7 @@
4647
* fragment is based on the cluster state and provided queue configuration.
4748
*/
4849
public class DistributedQueueParallelizer extends SimpleParallelizer {
50+
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedQueueParallelizer.class);
4951
private final boolean planHasMemory;
5052
private final QueryContext queryContext;
5153
private final QueryResourceManager rm;
@@ -65,9 +67,13 @@ public BiFunction<DrillbitEndpoint, PhysicalOperator, Long> getMemory() {
6567
if (!planHasMemory) {
6668
final DrillNode drillEndpointNode = DrillNode.create(endpoint);
6769
if (operator.isBufferedOperator(queryContext)) {
68-
return operators.get(drillEndpointNode).get(operator);
70+
Long operatorsMemory = operators.get(drillEndpointNode).get(operator);
71+
logger.debug(" Memory requirement for the operator {} in endpoint {} is {}", operator, endpoint, operatorsMemory);
72+
return operatorsMemory;
6973
} else {
70-
return operator.getMaxAllocation();
74+
Long nonBufferedMemory = (long)operator.getCost().getMemoryCost();
75+
logger.debug(" Memory requirement for the operator {} in endpoint {} is {}", operator, endpoint, nonBufferedMemory);
76+
return nonBufferedMemory;
7177
}
7278
}
7379
else {
@@ -92,10 +98,11 @@ public BiFunction<DrillbitEndpoint, PhysicalOperator, Long> getMemory() {
9298
*/
9399
public void adjustMemory(PlanningSet planningSet, Set<Wrapper> roots,
94100
Map<DrillbitEndpoint, String> onlineEndpointUUIDs) throws ExecutionSetupException {
95-
96101
if (planHasMemory) {
102+
logger.debug(" Plan already has memory settings. Adjustment of the memory is skipped");
97103
return;
98104
}
105+
logger.info(" Memory adjustment phase triggered");
99106

100107
final Map<DrillNode, String> onlineDrillNodeUUIDs = onlineEndpointUUIDs.entrySet().stream()
101108
.collect(Collectors.toMap(x -> DrillNode.create(x.getKey()), x -> x.getValue()));
@@ -112,7 +119,7 @@ public void adjustMemory(PlanningSet planningSet, Set<Wrapper> roots,
112119

113120
for (Wrapper wrapper : roots) {
114121
traverse(wrapper, CheckedConsumer.throwingConsumerWrapper((Wrapper fragment) -> {
115-
MemoryCalculator calculator = new MemoryCalculator(planningSet, queryContext);
122+
MemoryCalculator calculator = new MemoryCalculator(planningSet, queryContext, rm.minimumOperatorMemory());
116123
fragment.getNode().getRoot().accept(calculator, fragment);
117124
NodeResources.merge(totalNodeResources, fragment.getResourceMap());
118125
operators.entrySet()
@@ -122,6 +129,10 @@ public void adjustMemory(PlanningSet planningSet, Set<Wrapper> roots,
122129
}));
123130
}
124131

132+
if (logger.isDebugEnabled()) {
133+
logger.debug(" Total node resource requirements for the plan is {}", getJSONFromResourcesMap(totalNodeResources));
134+
}
135+
125136
final QueryQueueConfig queueConfig;
126137
try {
127138
queueConfig = this.rm.selectQueue(max(totalNodeResources.values()));
@@ -130,8 +141,10 @@ public void adjustMemory(PlanningSet planningSet, Set<Wrapper> roots,
130141
}
131142

132143
Map<DrillNode,
133-
List<Pair<PhysicalOperator, Long>>> memoryAdjustedOperators = ensureOperatorMemoryWithinLimits(operators, totalNodeResources,
134-
queueConfig.getMaxQueryMemoryInMBPerNode());
144+
List<Pair<PhysicalOperator, Long>>> memoryAdjustedOperators =
145+
ensureOperatorMemoryWithinLimits(operators, totalNodeResources,
146+
convertMBToBytes(Math.min(queueConfig.getMaxQueryMemoryInMBPerNode(),
147+
queueConfig.getQueueTotalMemoryInMB(onlineEndpointUUIDs.size()))));
135148
memoryAdjustedOperators.entrySet().stream().forEach((x) -> {
136149
Map<PhysicalOperator, Long> memoryPerOperator = x.getValue().stream()
137150
.collect(Collectors.toMap(operatorLongPair -> operatorLongPair.getLeft(),
@@ -140,9 +153,17 @@ public void adjustMemory(PlanningSet planningSet, Set<Wrapper> roots,
140153
this.operators.put(x.getKey(), memoryPerOperator);
141154
});
142155

156+
if (logger.isDebugEnabled()) {
157+
logger.debug(" Total node resource requirements after adjustment {}", getJSONFromResourcesMap(totalNodeResources));
158+
}
159+
143160
this.rm.setCost(convertToUUID(totalNodeResources, onlineDrillNodeUUIDs));
144161
}
145162

163+
private long convertMBToBytes(long value) {
164+
return value * 1024 * 1024;
165+
}
166+
146167
private Map<String, NodeResources> convertToUUID(Map<DrillNode, NodeResources> nodeResourcesMap,
147168
Map<DrillNode, String> onlineDrillNodeUUIDs) {
148169
Map<String, NodeResources> nodeResourcesPerUUID = new HashMap<>();
@@ -172,50 +193,81 @@ private NodeResources max(Collection<NodeResources> resources) {
172193
*/
173194
private Map<DrillNode, List<Pair<PhysicalOperator, Long>>>
174195
ensureOperatorMemoryWithinLimits(Map<DrillNode, List<Pair<PhysicalOperator, Long>>> memoryPerOperator,
175-
Map<DrillNode, NodeResources> nodeResourceMap, long nodeLimit) {
196+
Map<DrillNode, NodeResources> nodeResourceMap, long nodeLimit) throws ExecutionSetupException {
176197
// Get the physical operators which are above the node memory limit.
177-
Map<DrillNode, List<Pair<PhysicalOperator, Long>>> onlyMemoryAboveLimitOperators = new HashMap<>();
178-
memoryPerOperator.entrySet().stream().forEach((entry) -> {
179-
onlyMemoryAboveLimitOperators.putIfAbsent(entry.getKey(), new ArrayList<>());
180-
if (nodeResourceMap.get(entry.getKey()).getMemoryInBytes() > nodeLimit) {
181-
onlyMemoryAboveLimitOperators.get(entry.getKey()).addAll(entry.getValue());
182-
}
183-
});
184-
198+
Map<DrillNode,
199+
List<Pair<PhysicalOperator, Long>>> onlyMemoryAboveLimitOperators = memoryPerOperator.entrySet()
200+
.stream()
201+
.filter(entry -> nodeResourceMap.get(entry.getKey()).getMemoryInBytes() > nodeLimit)
202+
.collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue()));
185203

186204
// Compute the total memory required by the physical operators on the drillbits which are above node limit.
187205
// Then use the total memory to adjust the memory requirement based on the permissible node limit.
188206
Map<DrillNode, List<Pair<PhysicalOperator, Long>>> memoryAdjustedDrillbits = new HashMap<>();
189207
onlyMemoryAboveLimitOperators.entrySet().stream().forEach(
190-
entry -> {
191-
Long totalMemory = entry.getValue().stream().mapToLong(Pair::getValue).sum();
192-
List<Pair<PhysicalOperator, Long>> adjustedMemory = entry.getValue().stream().map(operatorMemory -> {
208+
CheckedConsumer.throwingConsumerWrapper(entry -> {
209+
Long totalBufferedOperatorsMemoryReq = entry.getValue().stream().mapToLong(Pair::getValue).sum();
210+
Long nonBufferedOperatorsMemoryReq = nodeResourceMap.get(entry.getKey()).getMemoryInBytes() - totalBufferedOperatorsMemoryReq;
211+
Long bufferedOperatorsMemoryLimit = nodeLimit - nonBufferedOperatorsMemoryReq;
212+
if (bufferedOperatorsMemoryLimit < 0 || nonBufferedOperatorsMemoryReq < 0) {
213+
logger.error(" Operator memory requirements for buffered operators {} or non buffered operators {} is negative", bufferedOperatorsMemoryLimit,
214+
nonBufferedOperatorsMemoryReq);
215+
throw new ExecutionSetupException("Operator memory requirements for buffered operators " + bufferedOperatorsMemoryLimit + " or non buffered operators " +
216+
nonBufferedOperatorsMemoryReq + " is less than zero");
217+
}
218+
List<Pair<PhysicalOperator, Long>> adjustedMemory = entry.getValue().stream().map(operatorAndMemory -> {
193219
// formula to adjust the memory is (optimalMemory / totalMemory(this is for all the operators)) * permissible_node_limit.
194-
return Pair.of(operatorMemory.getKey(), (long) Math.ceil(operatorMemory.getValue()/totalMemory * nodeLimit));
220+
return Pair.of(operatorAndMemory.getKey(),
221+
Math.max(this.rm.minimumOperatorMemory(),
222+
(long) Math.ceil(operatorAndMemory.getValue()/totalBufferedOperatorsMemoryReq * bufferedOperatorsMemoryLimit)));
195223
}).collect(Collectors.toList());
196224
memoryAdjustedDrillbits.put(entry.getKey(), adjustedMemory);
197225
NodeResources nodeResources = nodeResourceMap.get(entry.getKey());
198-
nodeResources.setMemoryInBytes(adjustedMemory.stream().mapToLong(Pair::getValue).sum());
199-
}
226+
nodeResources.setMemoryInBytes(nonBufferedOperatorsMemoryReq + adjustedMemory.stream().mapToLong(Pair::getValue).sum());
227+
})
200228
);
201229

230+
checkIfWithinLimit(nodeResourceMap, nodeLimit);
231+
202232
// Get all the operations on drillbits which were adjusted for memory and merge them with operators which are not
203233
// adjusted for memory.
204-
Map<DrillNode, List<Pair<PhysicalOperator, Long>>> allDrillbits = new HashMap<>();
205-
memoryPerOperator.entrySet().stream().filter((entry) -> !memoryAdjustedDrillbits.containsKey(entry.getKey())).forEach(
206-
operatorMemory -> {
207-
allDrillbits.put(operatorMemory.getKey(), operatorMemory.getValue());
208-
}
209-
);
234+
Map<DrillNode,
235+
List<Pair<PhysicalOperator, Long>>> allDrillbits = memoryPerOperator.entrySet()
236+
.stream()
237+
.filter((entry) -> !memoryAdjustedDrillbits.containsKey(entry.getKey()))
238+
.collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue()));
210239

211240
memoryAdjustedDrillbits.entrySet().stream().forEach(
212-
operatorMemory -> {
213-
allDrillbits.put(operatorMemory.getKey(), operatorMemory.getValue());
214-
}
215-
);
241+
operatorMemory -> allDrillbits.put(operatorMemory.getKey(), operatorMemory.getValue()));
216242

217243
// At this point allDrillbits contains the operators on all drillbits. The memory also is adjusted based on the nodeLimit and
218244
// the ratio of their requirements.
219245
return allDrillbits;
220246
}
247+
248+
private void checkIfWithinLimit(Map<DrillNode, NodeResources> nodeResourcesMap, long nodeLimit) throws ExecutionSetupException {
249+
for (Map.Entry<DrillNode, NodeResources> entry : nodeResourcesMap.entrySet()) {
250+
if (entry.getValue().getMemoryInBytes() > nodeLimit) {
251+
logger.error(" Memory requirement for the query cannot be adjusted." +
252+
" Memory requirement {} (in bytes) for a node {} is greater than limit {}", entry.getValue()
253+
.getMemoryInBytes(), entry.getKey(), nodeLimit);
254+
throw new ExecutionSetupException("Minimum memory requirement "
255+
+ entry.getValue().getMemoryInBytes() + " for a node " + entry.getKey() + " is greater than limit: " + nodeLimit);
256+
}
257+
}
258+
}
259+
260+
private String getJSONFromResourcesMap(Map<DrillNode, NodeResources> resourcesMap) {
261+
String json = "";
262+
try {
263+
json = new ObjectMapper().writeValueAsString(resourcesMap.entrySet()
264+
.stream()
265+
.collect(Collectors.toMap(entry -> entry.getKey()
266+
.toString(), Map.Entry::getValue)));
267+
} catch (JsonProcessingException exception) {
268+
logger.error(" Cannot convert the Node resources map to json ");
269+
}
270+
271+
return json;
272+
}
221273
}

exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,17 @@
1717
*/
1818
package org.apache.drill.exec.planner.fragment;
1919

20-
import java.util.ArrayList;
21-
import java.util.Iterator;
22-
import java.util.List;
23-
2420
import org.apache.drill.exec.ops.QueryContext;
2521
import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
2622
import org.apache.drill.exec.physical.base.Exchange;
2723
import org.apache.drill.exec.physical.base.PhysicalOperator;
28-
import org.apache.drill.exec.util.memory.ZKQueueMemoryAllocationUtilities;
2924
import org.apache.drill.exec.work.foreman.ForemanSetupException;
30-
3125
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
3226

27+
import java.util.ArrayList;
28+
import java.util.Iterator;
29+
import java.util.List;
30+
3331
public class Fragment implements Iterable<Fragment.ExchangeFragmentPair> {
3432
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Fragment.class);
3533

0 commit comments

Comments
 (0)